<a href="https://colab.research.google.com/github/luismiguelmartinluengo/PySpark_Demos/blob/main/Basicos_Dataframe.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [127]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [128]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import lower, upper

In [129]:
'''para trabajar con RDD se usa SparkContext. Sirve para trabajar los datos a bajo nivel pero tiene menor funcionalidad y menos integración con otros servicios que SparkSession.
  -para trabajar con dataframes hay trabajar con SparkSession, que da acceso a las funcionalidades de de SQL, Dataframe, etc. SparkSession está pensado para trabajar con los
  datos a alto nivel
'''
sparkSession = SparkSession.builder.appName('Basicos Datafrae').getOrCreate()

In [130]:
#Crear Dataframe a partir de un RDD
datos = [('Hola',1,True,0.1),('como',2,False,0.2),('estan',3,True,0.3),('ustedes',4,False,0.4)]
rddDatos = sparkSession.sparkContext.parallelize(datos)
print(rddDatos.take(5))
#hay que crear el squema correspondiente a los datos del RDD que se pasarán para crear el Dataframe
esquemaDatos = StructType([StructField('Palabra', StringType(), True),
                           StructField('Orden', IntegerType(), False),
                           StructField('Flag', BooleanType(), False),
                           StructField('Porcentaje', FloatType(), True)])
df = sparkSession.createDataFrame(rddDatos, schema = esquemaDatos)
df.show()

[('Hola', 1, True, 0.1), ('como', 2, False, 0.2), ('estan', 3, True, 0.3), ('ustedes', 4, False, 0.4)]
+-------+-----+-----+----------+
|Palabra|Orden| Flag|Porcentaje|
+-------+-----+-----+----------+
|   Hola|    1| true|       0.1|
|   como|    2|false|       0.2|
|  estan|    3| true|       0.3|
|ustedes|    4|false|       0.4|
+-------+-----+-----+----------+



In [131]:
#visualización
df.show(3,truncate=False, vertical=True)

-RECORD 0-----------
 Palabra    | Hola  
 Orden      | 1     
 Flag       | true  
 Porcentaje | 0.1   
-RECORD 1-----------
 Palabra    | como  
 Orden      | 2     
 Flag       | false 
 Porcentaje | 0.2   
-RECORD 2-----------
 Palabra    | estan 
 Orden      | 3     
 Flag       | true  
 Porcentaje | 0.3   
only showing top 3 rows



In [132]:
#visualización, puede especificarse el número de filas a mostrar, pero para eso mejor usar take (por claridad de código, porque funcionalmente son lo mismo)
df.head()

Row(Palabra='Hola', Orden=1, Flag=True, Porcentaje=0.10000000149011612)

In [133]:
df.take(3)

[Row(Palabra='Hola', Orden=1, Flag=True, Porcentaje=0.10000000149011612),
 Row(Palabra='como', Orden=2, Flag=False, Porcentaje=0.20000000298023224),
 Row(Palabra='estan', Orden=3, Flag=True, Porcentaje=0.30000001192092896)]

In [134]:
#select --> devuelve otro data frame
dfSelect = df.select('Palabra', (df.Porcentaje * 100).alias('%'))
dfSelect.show()

+-------+---------+
|Palabra|        %|
+-------+---------+
|   Hola|     10.0|
|   como|     20.0|
|  estan|30.000002|
|ustedes|     40.0|
+-------+---------+



In [135]:
#Filter (df.where es equivalente a df.filter)
dfFilter1 = df.filter((df.Palabra.isin('Hola','como','quien')) | (df.Porcentaje > 0.35)) #como en Pandas, las claúsulas or tienen que estar encapsuladas por paréntesis
dfFilter1.show()

+-------+-----+-----+----------+
|Palabra|Orden| Flag|Porcentaje|
+-------+-----+-----+----------+
|   Hola|    1| true|       0.1|
|   como|    2|false|       0.2|
|ustedes|    4|false|       0.4|
+-------+-----+-----+----------+



In [136]:
#withColumn
dfWithColumn = df.withColumn('Palabra Mayuscula', upper(df['Palabra'])).withColumn('Palabra', lower(df['Palabra']))
dfWithColumn.show()

+-------+-----+-----+----------+-----------------+
|Palabra|Orden| Flag|Porcentaje|Palabra Mayuscula|
+-------+-----+-----+----------+-----------------+
|   hola|    1| true|       0.1|             HOLA|
|   como|    2|false|       0.2|             COMO|
|  estan|    3| true|       0.3|            ESTAN|
|ustedes|    4|false|       0.4|          USTEDES|
+-------+-----+-----+----------+-----------------+



In [137]:
pathSuperStore = "/content/drive/MyDrive/Colab Notebooks/data/Superstore.csv"
dfSuperStore = sparkSession.read.csv(pathSuperStore, header=True, quote = '"', escape = '"', inferSchema=True)
dfSuperStore.show(5)

+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|  Customer Name|  Segment|      Country|           City|     State|Postal Code|Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount|  Profit|
+------+--------------+----------+----------+--------------+-----------+---------------+---------+-------------+---------------+----------+-----------+------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|     1|CA-2016-152156| 11/8/2016|11/11/2016|  Second Class|   CG-12520|    Claire Gute| Consumer|United States|      Henderson|  Kentucky|      42420| South|FUR-BO-10001798|      Furniture|   Bookcases|Bush Somerset 

In [138]:
#groupBy
dfSalesByCity = dfSuperStore.groupBy('City')\
                            .agg({'Sales':'sum'})\
                            .withColumnRenamed('sum(Sales)','Total Sales')\
                            .orderBy('Total Sales', ascending=False)
dfSalesByCity.show(5)

+-------------+------------------+
|         City|       Total Sales|
+-------------+------------------+
|New York City|        256368.161|
|  Los Angeles|        175851.341|
|      Seattle|        119540.742|
|San Francisco|112669.09199999992|
| Philadelphia|109077.01300000008|
+-------------+------------------+
only showing top 5 rows



In [139]:
#Window
#Reto: identificar el mejor cliente (Customer Name) por beneficio (Profit) de cada Segmento (Segment)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
dfProfitByCustomer = dfSuperStore.groupBy('Customer Name','Segment')\
                                .agg({'Profit':'sum'})\
                                .withColumnRenamed('sum(Profit)','Total Profit')
ventana = Window.partitionBy('Segment').orderBy(dfProfitByCustomer['Total Profit'].desc())
dfProfitByCustomer = dfProfitByCustomer.withColumn('rank', row_number().over(ventana))
dfTopCustomerBySegment = dfProfitByCustomer.filter(dfProfitByCustomer.rank == 1).drop('rank')
dfTopCustomerBySegment.show()

+-------------+-----------+-----------------+
|Customer Name|    Segment|     Total Profit|
+-------------+-----------+-----------------+
| Raymond Buch|   Consumer|        6976.0959|
| Tamara Chand|  Corporate|8981.323900000001|
| Tom Ashbrook|Home Office|4703.788299999999|
+-------------+-----------+-----------------+



In [140]:
from pyspark.sql.functions import year, split
dfSalesByYear = dfSuperStore.select('Order Date','Segment','Sales')
dfSalesByYear = dfSalesByYear.withColumn('OrderYear', split(dfSalesByYear['Order Date'],'/').getItem(2))\
                            .groupBy('OrderYear','Segment')\
                            .agg({'Sales':'sum'})\
                            .withColumnRenamed('sum(Sales)','Total Sales')\
                            .orderBy(['Segment','OrderYear'], ascending=True)
dfSalesByYear.show()

+---------+-----------+------------------+
|OrderYear|    Segment|       Total Sales|
+---------+-----------+------------------+
|     2014|   Consumer| 266096.8126000003|
|     2015|   Consumer| 266535.9332999999|
|     2016|   Consumer|296863.89920000033|
|     2017|   Consumer| 331904.6999000011|
|     2014|  Corporate|128434.87370000005|
|     2015|  Corporate|128757.30689999994|
|     2016|  Corporate|207106.36179999987|
|     2017|  Corporate|241847.82440000013|
|     2014|Home Office| 89715.81180000005|
|     2015|Home Office|        75239.2688|
|     2016|Home Office|105235.33699999996|
|     2017|Home Office|159462.73090000002|
+---------+-----------+------------------+



In [141]:
#groupby + pivot (partiendo del df original)
dfSalesPivotYear = dfSuperStore.withColumn('OrderYear', split(dfSuperStore['Order Date'],'/').getItem(2))\
                            .withColumn('Sales', dfSuperStore['Sales'].cast(DoubleType()))\
                            .groupBy('Segment')\
                            .pivot('OrderYear')\
                            .sum('Sales')\
                            .orderBy('Segment')
dfSalesPivotYear.show()


+-----------+------------------+------------------+------------------+------------------+
|    Segment|              2014|              2015|              2016|              2017|
+-----------+------------------+------------------+------------------+------------------+
|   Consumer| 266096.8126000003| 266535.9332999999|296863.89920000033| 331904.6999000011|
|  Corporate|128434.87370000005|128757.30689999994|207106.36179999987|241847.82440000013|
|Home Office| 89715.81180000005|        75239.2688|105235.33699999996|159462.73090000002|
+-----------+------------------+------------------+------------------+------------------+



In [142]:
#Cálculos múltiples
from pyspark.sql.functions import avg, max, min, first
dfAgg = dfSuperStore.groupBy('Segment','Region')\
                    .agg(avg('Profit').alias('AvgProfit'),
                         max('Quantity').alias('MaxQuantity'),
                         min('Quantity').alias('MinQuantity'),
                         first('Category').alias('FirstCategoy'))
dfAgg.show()

+-----------+-------+------------------+-----------+-----------+---------------+
|    Segment| Region|         AvgProfit|MaxQuantity|MinQuantity|   FirstCategoy|
+-----------+-------+------------------+-----------+-----------+---------------+
|   Consumer|Central| 7.066046287128713|         14|          1|Office Supplies|
|   Consumer|   East|28.040152688904023|         14|          1|      Furniture|
|   Consumer|  South| 32.11643532219569|         14|          1|      Furniture|
|   Consumer|   West| 34.36040909090916|         14|          1|      Furniture|
|  Corporate|Central| 27.79183060921253|         11|          1|Office Supplies|
|  Corporate|   East| 26.93566579247437|         14|          1|Office Supplies|
|  Corporate|  South| 29.83377098039214|         14|          1|Office Supplies|
|  Corporate|   West| 35.87232281250003|         14|          1|Office Supplies|
|Home Office|Central|28.398201826484033|         14|          1|Office Supplies|
|Home Office|   East| 53.205

In [144]:
#rollup
#Calculo del total de ventas por Segmento y Región
from pyspark.sql.functions import asc_nulls_last
dfSalesBySegmentAndRegion = dfSuperStore.rollup('Segment','Region')\
                                        .agg({'Sales':'sum'})\
                                        .withColumnRenamed('sum(Sales)','Total Sales')\
                                        .orderBy([asc_nulls_last('Segment'),asc_nulls_last('Region')])
dfSalesBySegmentAndRegion.show()

+-----------+-------+------------------+
|    Segment| Region|       Total Sales|
+-----------+-------+------------------+
|   Consumer|Central|252031.43400000007|
|   Consumer|   East|350908.16700000066|
|   Consumer|  South|195580.97100000017|
|   Consumer|   West| 362880.7730000003|
|   Consumer|   NULL|1161401.3449999888|
|  Corporate|Central|       157995.8128|
|  Corporate|   East|200409.34699999995|
|  Corporate|  South|121885.93250000005|
|  Corporate|   West|225855.27449999977|
|  Corporate|   NULL| 706146.3668000001|
|Home Office|Central| 91212.64399999994|
|Home Office|   East|127463.72599999998|
|Home Office|  South| 74255.00150000004|
|Home Office|   West| 136721.7769999999|
|Home Office|   NULL| 429653.1485000003|
|       NULL|   NULL| 2297200.860299955|
+-----------+-------+------------------+



In [145]:
# prompt: igual que la celda anterior pero usando cube en vez rollup

# Assuming dfSuperStore is your DataFrame
dfSalesBySegmentAndRegion = dfSuperStore.cube('Segment','Region')\
                                        .agg({'Sales':'sum'})\
                                        .withColumnRenamed('sum(Sales)','Total Sales')\
                                        .orderBy([asc_nulls_last('Segment'),asc_nulls_last('Region')])
dfSalesBySegmentAndRegion.show()

+-----------+-------+------------------+
|    Segment| Region|       Total Sales|
+-----------+-------+------------------+
|   Consumer|Central|252031.43400000007|
|   Consumer|   East|350908.16700000066|
|   Consumer|  South|195580.97100000017|
|   Consumer|   West| 362880.7730000003|
|   Consumer|   NULL|1161401.3449999888|
|  Corporate|Central|       157995.8128|
|  Corporate|   East|200409.34699999995|
|  Corporate|  South|121885.93250000005|
|  Corporate|   West|225855.27449999977|
|  Corporate|   NULL| 706146.3668000001|
|Home Office|Central| 91212.64399999994|
|Home Office|   East|127463.72599999998|
|Home Office|  South| 74255.00150000004|
|Home Office|   West| 136721.7769999999|
|Home Office|   NULL| 429653.1485000003|
|       NULL|Central| 501239.8908000005|
|       NULL|   East| 678781.2399999979|
|       NULL|  South| 391721.9050000003|
|       NULL|   West| 725457.8245000006|
|       NULL|   NULL| 2297200.860299955|
+-----------+-------+------------------+



In [146]:
#Imputación de valores nulos con ImputerModel
from pyspark.sql.functions import when, avg
from pyspark.ml.feature import Imputer
dfWithNulls = dfSuperStore.withColumn('Sales', when(dfSuperStore.Sales.between(158, 261), None).otherwise(dfSuperStore.Sales.cast(DoubleType())))
imputer = Imputer().setInputCol("Sales").setOutputCol("SalesImputed").setStrategy("mode")
imputerModel = imputer.fit(dfWithNulls) #Ojo con la documentación de PySpark, según ella el método fit devuelve un "Transformer", pero en realidad es un objeto ImputerModel
dfWithNulls = imputerModel.transform(dfWithNulls)
print('Nulos en columna dfWithNulls.Sales: ', dfWithNulls.filter(dfWithNulls.Sales.isNull()).count())
print('Nulos en columna dfWithNulls.SalesImputed: ', dfWithNulls.filter(dfWithNulls.SalesImputed.isNull()).count())
print('dfWithNulls.Sales.avg: ', dfWithNulls.select(avg('Sales')).collect()[0][0])
print('dfWithNulls.SalesImputed.avg: ', dfWithNulls.select(avg('SalesImputed')).collect()[0][0])


Nulos en columna dfWithNulls.Sales:  881
Nulos en columna dfWithNulls.SalesImputed:  0
dfWithNulls.Sales.avg:  232.23014214857454
dfWithNulls.SalesImputed.avg:  212.900845047022


In [147]:
#Columnas Array & explode
from pyspark.sql.functions import col, explode
dfArray = sparkSession.createDataFrame([('a',[1,2,3]),
                                        ('b',[4,5,6]),
                                        ('c',[7,8,9])],
                                        ['letra','numeros'])
dfArray.select(col('numeros')[1]).show()
dfArray.withColumn('numero', explode(col('numeros'))).drop('numeros').show()


+----------+
|numeros[1]|
+----------+
|         2|
|         5|
|         8|
+----------+

+-----+------+
|letra|numero|
+-----+------+
|    a|     1|
|    a|     2|
|    a|     3|
|    b|     4|
|    b|     5|
|    b|     6|
|    c|     7|
|    c|     8|
|    c|     9|
+-----+------+



In [148]:
#StructType
esquema = StructType([StructField('nombre', StringType(), True),
                      StructField('edad', IntegerType(), True),
                      StructField('direccion', StructType([StructField('calle', StringType(), True),
                                                           StructField('numero', IntegerType(), True),
                                                           StructField('ciudad', StringType(), True)]), True)])
datos = [('Juan',25,('Calle 1',123,'Parla')),
         ('Maria',30,('Calle 2',456,'Parla')),
         ('Pedro',28,('Calle 3',789,'Getafe'))]
dfStruct = sparkSession.createDataFrame(datos, schema = esquema)
dfStruct.show(truncate = False)
print('Pedro vive en', dfStruct.filter(dfStruct.nombre == 'Pedro').select('direccion.ciudad').collect()[0][0])

+------+----+----------------------+
|nombre|edad|direccion             |
+------+----+----------------------+
|Juan  |25  |{Calle 1, 123, Parla} |
|Maria |30  |{Calle 2, 456, Parla} |
|Pedro |28  |{Calle 3, 789, Getafe}|
+------+----+----------------------+

Pedro vive en Getafe


In [149]:
#MapType
esquemaMap = StructType([StructField('nombre', StringType(), True),
                        StructField('edad', IntegerType(), True),
                        StructField('atributos', MapType(StringType(), StringType()), True)])
datosMap = [('Juan',25,{'ojos':'azules','altura':'1.75','peso':'87'}),
            ('Maria',30,{'ojos':'marrones','altura':'1.65'}),
            ('Pedro',28,{'ojos':'verde','pelo':'rubio','altura':'1.80'})]
dfMap = sparkSession.createDataFrame(datosMap, schema = esquemaMap)
dfMap.show(truncate = False)
print('Pedro mide', dfMap.filter(dfMap.nombre == 'Pedro').select(dfMap.atributos['altura']).collect()[0][0])

+------+----+----------------------------------------------+
|nombre|edad|atributos                                     |
+------+----+----------------------------------------------+
|Juan  |25  |{ojos -> azules, peso -> 87, altura -> 1.75}  |
|Maria |30  |{altura -> 1.65, ojos -> marrones}            |
|Pedro |28  |{ojos -> verde, pelo -> rubio, altura -> 1.80}|
+------+----+----------------------------------------------+

Pedro mide 1.80


In [150]:
#UDF --> pyspark no tiene .apply() de pandas, así que para usar una función personalizada hay que encapsularla dentro de un objeto udf
from pyspark.sql.functions import udf
def deleteVocales(s):
  vocales = 'aeiouAEIOU'
  return ''.join([c for c in s if c not in vocales])
#End deleteVocales
udfDeleteVocales = udf(deleteVocales, StringType())
dfMap = dfMap.withColumn('nombre sin vocales', udfDeleteVocales(dfMap.nombre))
dfMap.show()


+------+----+--------------------+------------------+
|nombre|edad|           atributos|nombre sin vocales|
+------+----+--------------------+------------------+
|  Juan|  25|{ojos -> azules, ...|                Jn|
| Maria|  30|{altura -> 1.65, ...|                Mr|
| Pedro|  28|{ojos -> verde, p...|               Pdr|
+------+----+--------------------+------------------+

