In [1]:
##########################################################################################################
# VERSION  	DESARROLLADOR             FECHA        DESCRIPCION
# -------------------------------------------------------------
#  1        Walter Albites Azarte     10/12/2021   Curso PySpark Entorno Local - Dataframe
##########################################################################################################

In [2]:
import findspark
findspark.init()
findspark.find()

'C:\\spark-3.1.2-bin-hadoop2.7'

In [3]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf= pyspark.SparkConf().setAppName('SparkApp').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark=SparkSession(sc)

In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [5]:
### Crear Dataframe Spark personas
schemaPersona = StructType([
    StructField("dni", StringType(),True),
    StructField("nombre", StringType(),True),
    StructField("edad", IntegerType(),True)
])

dataPersona = ([("99999999", "Walter",35),
                ("88888888", "Susan",30),
                ("77777777", "Alejandro",12),
                ("66666666", "Pedro",30),
                ("55555555", "Karina",35),
                ("44444444", "Andy",26),
                ("33333333", " Raquel\n",45),
                ("22222222", "Gian",28),
                ("11111111", "Raul",45),
                ("00000000", "Elena",40),
                ("10101010", None, None),
                ("20202020",'',None)])

df_personas=spark.createDataFrame(dataPersona,schema=schemaPersona)
df_personas.show()

+--------+---------+----+
|     dni|   nombre|edad|
+--------+---------+----+
|99999999|   Walter|  35|
|88888888|    Susan|  30|
|77777777|Alejandro|  12|
|66666666|    Pedro|  30|
|55555555|   Karina|  35|
|44444444|     Andy|  26|
|33333333|  Raquel
|  45|
|22222222|     Gian|  28|
|11111111|     Raul|  45|
|00000000|    Elena|  40|
|10101010|     null|null|
|20202020|         |null|
+--------+---------+----+



In [6]:
df_personas.printSchema()

root
 |-- dni: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edad: integer (nullable = true)



In [7]:
#Verificar si existe Saltos de Linea en las columnas
df_personas.select([count(when( col(c).contains('\n'), c)).alias(c) for c in df_personas.columns]).show(5,False)

+---+------+----+
|dni|nombre|edad|
+---+------+----+
|0  |1     |0   |
+---+------+----+



In [8]:
#Correcion si es que huniera una columna encontrada x ejemplo nombre
df_personas=df_personas.withColumn('nombre',regexp_replace('nombre', '\n', ''))
df_personas.show(20)

+--------+---------+----+
|     dni|   nombre|edad|
+--------+---------+----+
|99999999|   Walter|  35|
|88888888|    Susan|  30|
|77777777|Alejandro|  12|
|66666666|    Pedro|  30|
|55555555|   Karina|  35|
|44444444|     Andy|  26|
|33333333|   Raquel|  45|
|22222222|     Gian|  28|
|11111111|     Raul|  45|
|00000000|    Elena|  40|
|10101010|     null|null|
|20202020|         |null|
+--------+---------+----+



In [9]:
# concat_ws() función de Pyspark concatena múltiples columnas de cadenas en una sola columna con un separador
# o delimitador determinado.
df_personas.withColumn('datos_personales', concat_ws(',',col('dni'),col('nombre'))).show(5,False)

+--------+---------+----+------------------+
|dni     |nombre   |edad|datos_personales  |
+--------+---------+----+------------------+
|99999999|Walter   |35  |99999999,Walter   |
|88888888|Susan    |30  |88888888,Susan    |
|77777777|Alejandro|12  |77777777,Alejandro|
|66666666|Pedro    |30  |66666666,Pedro    |
|55555555|Karina   |35  |55555555,Karina   |
+--------+---------+----+------------------+
only showing top 5 rows



In [10]:
# Coalesce función de Pyspark Devuelve la primera columna que no es nula.
df_documentoidentidad = spark.createDataFrame([(None, None), (88888888, None), (None, 99999999)], ("dni_1", "dni_2"))
df_documentoidentidad.show()

+--------+--------+
|   dni_1|   dni_2|
+--------+--------+
|    null|    null|
|88888888|    null|
|    null|99999999|
+--------+--------+



In [11]:
df_documentoidentidad.select(coalesce(col("dni_1"), col("dni_2")).alias("dni")).show()

+--------+
|     dni|
+--------+
|    null|
|88888888|
|99999999|
+--------+



In [12]:
df_personas.show()

+--------+---------+----+
|     dni|   nombre|edad|
+--------+---------+----+
|99999999|   Walter|  35|
|88888888|    Susan|  30|
|77777777|Alejandro|  12|
|66666666|    Pedro|  30|
|55555555|   Karina|  35|
|44444444|     Andy|  26|
|33333333|   Raquel|  45|
|22222222|     Gian|  28|
|11111111|     Raul|  45|
|00000000|    Elena|  40|
|10101010|     null|null|
|20202020|         |null|
+--------+---------+----+



In [13]:
### Ejemplos utilizando UDF

In [14]:
#Crear una funcion que convierta en minusculas
def ConvertirMinusculas(texto):
    texto=texto.lower()
    return texto

In [15]:
# Crear un UDF a apartir de una funcion de Python
udf_convertir_misnusculas=udf(ConvertirMinusculas,StringType())

In [16]:
# Crear una columna normalizada en minuscula con la columna nombre y curso, Saldra error al parecer lee todas las filas
# del dataframe Los null y vacios por mas que se use la condicional

#df_personasy=df_personas.withColumn("nombre_normalizado",when(((col("nombre").isNotNull()) & (col("nombre")!="")),
#                                         udf_convertir_misnusculas("nombre")).otherwise(""))

#df_personasy.show()

# AttributeError: 'NoneType' object has no attribute 'lower'

In [17]:
#Solucion 1
#Crear una nueva columna temporal para colocar valores vacios a los NULL
df_personasz=df_personas.select('*',col("nombre").alias("nombre_temp"))
df_personasz=df_personasz.na.fill(value="",subset=["nombre_temp"])
df_personasz.show()

+--------+---------+----+-----------+
|     dni|   nombre|edad|nombre_temp|
+--------+---------+----+-----------+
|99999999|   Walter|  35|     Walter|
|88888888|    Susan|  30|      Susan|
|77777777|Alejandro|  12|  Alejandro|
|66666666|    Pedro|  30|      Pedro|
|55555555|   Karina|  35|     Karina|
|44444444|     Andy|  26|       Andy|
|33333333|   Raquel|  45|     Raquel|
|22222222|     Gian|  28|       Gian|
|11111111|     Raul|  45|       Raul|
|00000000|    Elena|  40|      Elena|
|10101010|     null|null|           |
|20202020|         |null|           |
+--------+---------+----+-----------+



In [18]:
# Aplicamos el filtro y utilizamos el UDF 
df_personasz=df_personasz.withColumn("nombre_normalizado",
                                    when(col("nombre_temp")!="",
                                         udf_convertir_misnusculas("nombre_temp")).\
                                         #lit("Con Dato")).
                                    otherwise("")
                                   ).drop("nombre_temp")
df_personasz.show(20,False)

+--------+---------+----+------------------+
|dni     |nombre   |edad|nombre_normalizado|
+--------+---------+----+------------------+
|99999999|Walter   |35  |walter            |
|88888888|Susan    |30  |susan             |
|77777777|Alejandro|12  |alejandro         |
|66666666|Pedro    |30  |pedro             |
|55555555|Karina   |35  |karina            |
|44444444|Andy     |26  |andy              |
|33333333| Raquel  |45  | raquel           |
|22222222|Gian     |28  |gian              |
|11111111|Raul     |45  |raul              |
|00000000|Elena    |40  |elena             |
|10101010|null     |null|                  |
|20202020|         |null|                  |
+--------+---------+----+------------------+



In [19]:
#Solucion 2
#Modificar la funcion cuando es None asignarle vacio
def ConvertirMinusculas(texto):
    if texto==None:
        texto=""
    else:
        texto=texto.lower()
    return texto

udf_convertir_misnusculas=udf(ConvertirMinusculas,StringType())

In [20]:
df_personasm=df_personas.withColumn("nombre_normalizado",
                                    when(((col("nombre").isNotNull()) & (col("nombre")!="")),
                                         udf_convertir_misnusculas("nombre")).\
                                    otherwise("")
                                   )
df_personasm.show(20,False)

+--------+---------+----+------------------+
|dni     |nombre   |edad|nombre_normalizado|
+--------+---------+----+------------------+
|99999999|Walter   |35  |walter            |
|88888888|Susan    |30  |susan             |
|77777777|Alejandro|12  |alejandro         |
|66666666|Pedro    |30  |pedro             |
|55555555|Karina   |35  |karina            |
|44444444|Andy     |26  |andy              |
|33333333| Raquel  |45  | raquel           |
|22222222|Gian     |28  |gian              |
|11111111|Raul     |45  |raul              |
|00000000|Elena    |40  |elena             |
|10101010|null     |null|                  |
|20202020|         |null|                  |
+--------+---------+----+------------------+



In [21]:
df_personas_x=df_personas

In [22]:
# Renombrar Columnas utilizando withColumnRenamed, Consume mucha memoria
for column in df_personas_x.columns:
        df_personas_x=df_personas_x.withColumnRenamed(column,'personas_' + column)

In [23]:
df_personas_x.show(5,False)

+------------+---------------+-------------+
|personas_dni|personas_nombre|personas_edad|
+------------+---------------+-------------+
|99999999    |Walter         |35           |
|88888888    |Susan          |30           |
|77777777    |Alejandro      |12           |
|66666666    |Pedro          |30           |
|55555555    |Karina         |35           |
+------------+---------------+-------------+
only showing top 5 rows



In [24]:
# Renombrar columnas utilizando Alias, no consume mucha memoria Recomendable
df_personas_y=df_personas.select([col(i).alias('persons_'+i) for i in df_personas.columns])
df_personas_y.show()

+-----------+--------------+------------+
|persons_dni|persons_nombre|persons_edad|
+-----------+--------------+------------+
|   99999999|        Walter|          35|
|   88888888|         Susan|          30|
|   77777777|     Alejandro|          12|
|   66666666|         Pedro|          30|
|   55555555|        Karina|          35|
|   44444444|          Andy|          26|
|   33333333|        Raquel|          45|
|   22222222|          Gian|          28|
|   11111111|          Raul|          45|
|   00000000|         Elena|          40|
|   10101010|          null|        null|
|   20202020|              |        null|
+-----------+--------------+------------+



In [25]:
# Indentificar Columnas NULL, Nan y Vacios
df_personas.select([count(when( (isnan(c))| (col(c).isNull()) | (col(c)==''), c)).alias(c) for c in df_personas.columns]).show()

+---+------+----+
|dni|nombre|edad|
+---+------+----+
|  0|     2|   2|
+---+------+----+



In [26]:
#Verificar espacios al inicio y al final de una columna
df_personas.select([count(when( col(c).contains('\xa0'), c)).alias(c) for c in df_personas.columns]).show(5,False)

+---+------+----+
|dni|nombre|edad|
+---+------+----+
|0  |0     |0   |
+---+------+----+



In [27]:
# Adicionar una columna con la fecha del sistema
df_personas.withColumn('fecha',current_date()).show()

+--------+---------+----+----------+
|     dni|   nombre|edad|     fecha|
+--------+---------+----+----------+
|99999999|   Walter|  35|2022-06-02|
|88888888|    Susan|  30|2022-06-02|
|77777777|Alejandro|  12|2022-06-02|
|66666666|    Pedro|  30|2022-06-02|
|55555555|   Karina|  35|2022-06-02|
|44444444|     Andy|  26|2022-06-02|
|33333333|   Raquel|  45|2022-06-02|
|22222222|     Gian|  28|2022-06-02|
|11111111|     Raul|  45|2022-06-02|
|00000000|    Elena|  40|2022-06-02|
|10101010|     null|null|2022-06-02|
|20202020|         |null|2022-06-02|
+--------+---------+----+----------+



In [28]:
# Adicionar una columna con la fecha y Hora  del sistema
df_personas.withColumn("fechaHora",to_timestamp(current_timestamp(),"MM-dd-yyyy HH mm ss SSS")).show(5,False)

+--------+---------+----+-----------------------+
|dni     |nombre   |edad|fechaHora              |
+--------+---------+----+-----------------------+
|99999999|Walter   |35  |2022-06-02 10:02:54.264|
|88888888|Susan    |30  |2022-06-02 10:02:54.264|
|77777777|Alejandro|12  |2022-06-02 10:02:54.264|
|66666666|Pedro    |30  |2022-06-02 10:02:54.264|
|55555555|Karina   |35  |2022-06-02 10:02:54.264|
+--------+---------+----+-----------------------+
only showing top 5 rows



In [29]:
# Capturar la fecha y hora de Inicio de ejecucion
from datetime import datetime
x=str(datetime.now())
print(x)

2022-06-02 10:02:54.881479


In [30]:
# fechas en Python
import time
print (time.strftime("%d/%m/%y"))

02/06/22


In [31]:
df_personas.withColumn('date',lit(time.strftime("%d/%m/%y"))).show()

+--------+---------+----+--------+
|     dni|   nombre|edad|    date|
+--------+---------+----+--------+
|99999999|   Walter|  35|02/06/22|
|88888888|    Susan|  30|02/06/22|
|77777777|Alejandro|  12|02/06/22|
|66666666|    Pedro|  30|02/06/22|
|55555555|   Karina|  35|02/06/22|
|44444444|     Andy|  26|02/06/22|
|33333333|   Raquel|  45|02/06/22|
|22222222|     Gian|  28|02/06/22|
|11111111|     Raul|  45|02/06/22|
|00000000|    Elena|  40|02/06/22|
|10101010|     null|null|02/06/22|
|20202020|         |null|02/06/22|
+--------+---------+----+--------+



In [32]:
# Valores maximos de cada Columna
for i in df_personas.columns:
    df_personas.select(max(length(i))).show()

+----------------+
|max(length(dni))|
+----------------+
|               8|
+----------------+

+-------------------+
|max(length(nombre))|
+-------------------+
|                  9|
+-------------------+

+-----------------+
|max(length(edad))|
+-----------------+
|                2|
+-----------------+



In [33]:
################### Join parametrizable en Funciones Pyspark

In [34]:
#Crear Dataframe Spark Cursos
schemaCursos = StructType([
    StructField("dni", StringType(),True),
    StructField("curso", StringType(),True),
    StructField("precio", DoubleType(),True)
])

dataCurso = ([("99999999", "Spark",100.50),
              ("99999999", "Scala",100.50),
              ("99999999", "Java",100.50),
              ("88888888", "Ingles",80.90),
              ("77777777", "Java",12.50)
])

df_cursos=spark.createDataFrame(dataCurso,schema=schemaCursos)
df_cursos.show()

+--------+------+------+
|     dni| curso|precio|
+--------+------+------+
|99999999| Spark| 100.5|
|99999999| Scala| 100.5|
|99999999|  Java| 100.5|
|88888888|Ingles|  80.9|
|77777777|  Java|  12.5|
+--------+------+------+



In [35]:
def join_example(dfa,dfb):
    dfa_whith_dfb=dfa.alias("a").join(dfb.alias("b"),col("a.dni")==col("b.dni"),how='inner')
    return dfa_whith_dfb

dfa_whith_dfb=join_example(df_personas,df_cursos)
dfa_whith_dfb.show()

+--------+---------+----+--------+------+------+
|     dni|   nombre|edad|     dni| curso|precio|
+--------+---------+----+--------+------+------+
|88888888|    Susan|  30|88888888|Ingles|  80.9|
|99999999|   Walter|  35|99999999| Spark| 100.5|
|99999999|   Walter|  35|99999999| Scala| 100.5|
|99999999|   Walter|  35|99999999|  Java| 100.5|
|77777777|Alejandro|  12|77777777|  Java|  12.5|
+--------+---------+----+--------+------+------+



In [36]:
################### Join parametrizable en Funciones Pandas to Pyspark

In [37]:
df_personas_pandas=df_personas.toPandas()
df_cursos_pandas=df_cursos.toPandas()

def merge_example_pandas(dfa,dfb):
    dfa_whith_dfb=dfa.merge(dfb,how='inner',on="dni")
    return dfa_whith_dfb

dfa_whith_dfb_pandas=merge_example_pandas(df_personas_pandas,df_cursos_pandas)

dfa_whith_dfb_pandas.head()

Unnamed: 0,dni,nombre,edad,curso,precio
0,99999999,Walter,35.0,Spark,100.5
1,99999999,Walter,35.0,Scala,100.5
2,99999999,Walter,35.0,Java,100.5
3,88888888,Susan,30.0,Ingles,80.9
4,77777777,Alejandro,12.0,Java,12.5


In [38]:
#Convertir a dataframe de Spark
dfa_whith_dfb_pandas_spark = spark.createDataFrame(dfa_whith_dfb_pandas)
dfa_whith_dfb_pandas_spark.show()

+--------+---------+----+------+------+
|     dni|   nombre|edad| curso|precio|
+--------+---------+----+------+------+
|99999999|   Walter|35.0| Spark| 100.5|
|99999999|   Walter|35.0| Scala| 100.5|
|99999999|   Walter|35.0|  Java| 100.5|
|88888888|    Susan|30.0|Ingles|  80.9|
|77777777|Alejandro|12.0|  Java|  12.5|
+--------+---------+----+------+------+



In [39]:
#Conteos
total=df_personas.count()

for i in df_personas.columns:
    conteo=df_personas.select(i).filter(col(i).isNotNull()).count()
    print(i,conteo,(conteo/total)*100)

dni 12 100.0
nombre 11 91.66666666666666
edad 10 83.33333333333334


In [40]:
# leer CSV para que reconozca los satos de linea
#df_personas = spark.read.options(header='True',inferSchema='True',delimiter=',',escape='"',quote='"',multiLine=True).\
#csv(root + 'personas.csv')

In [41]:
# Inicio Tranformar DataFrame de Pandas a Dataframe de Spark
# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types):
        struct_list.append(define_structure(column, typo))
        
    p_schema = StructType(struct_list)
    return spark.createDataFrame(pandas_df, p_schema)

In [42]:
#Creamos un DataFrame en Pandas
import pandas as pd

df1 = pd.DataFrame({
   'Jugadores': ["Federer", "Ronaldo", "Phelps", "Valenzuela"],
   'Posicion': ["Delantero", "Delantero", "Medio campista", "Defensa"]
})
print(df1.dtypes)
df1.head()

Jugadores    object
Posicion     object
dtype: object


Unnamed: 0,Jugadores,Posicion
0,Federer,Delantero
1,Ronaldo,Delantero
2,Phelps,Medio campista
3,Valenzuela,Defensa


In [43]:
DF1_SPARK=pandas_to_spark(df1)
DF1_SPARK.show()

+----------+--------------+
| Jugadores|      Posicion|
+----------+--------------+
|   Federer|     Delantero|
|   Ronaldo|     Delantero|
|    Phelps|Medio campista|
|Valenzuela|       Defensa|
+----------+--------------+



In [44]:
DF1_SPARK.printSchema()
# Fin Tranformar DataFrame de Pandas a Dataframe de Spark

root
 |-- Jugadores: string (nullable = true)
 |-- Posicion: string (nullable = true)

