# Data Processing using Pyspark

In [45]:
#import SparkSession
from pyspark.sql import SparkSession

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Primero importamos la libreria sparksession, esta nos servira para las futuras funciones.

Esta claramente viene de la libreria pyspark

In [114]:
#create spar session object
spark=SparkSession.builder.appName('data_processing').getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Creamos un objeto Spark. Con este podemos crear una sesion y nos da acceso a las diferentes funciones del mismo

In [47]:
# Load csv Dataset 
df=spark.read.csv('s3://reto5-notebooks/datasets-ssh/sample_data.csv',inferSchema=True,header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Una de estas funciones es esta de lectura remota de archivos, propiamente de S3. Esta nos la crea en un formato de dataframe (Por eso inferar el Schema y poner Headers)

In [48]:
#columns of dataframe
df.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['ratings', 'age', 'experience', 'family', 'mobile']

Estos dataframes (de aqui en adelante df) nos dan acceso a sus propiedades. Una de estas son las columnas lo cual devuelve los headers de los datos

In [49]:
#check number of columns
len(df.columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5

Esta nos da cuantas columnas (features) tiene la data

In [50]:
#number of records in dataframe
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

33

Esta nos da el numero de registros del df. Osea, cuantas filas

In [51]:
#shape of dataset
print((df.count(),len(df.columns)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(33, 5)

Estos son los mismos datos pero en print. Mas que todo se refiere a que cuando mostramos ambos datos juntos se refiere a la forma del df

In [52]:
#printSchema
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- ratings: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: double (nullable = true)
 |-- family: integer (nullable = true)
 |-- mobile: string (nullable = true)

Este en forma de "arbol" o lista (mas bien un formato visual) nos da el nombre de las columnas junto a su tipo de dato y si admite datos nulos

In [53]:
#fisrt few rows of dataframe
df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---+----------+------+-------+
|ratings|age|experience|family| mobile|
+-------+---+----------+------+-------+
|      3| 32|       9.0|     3|   Vivo|
|      3| 27|      13.0|     3|  Apple|
|      4| 22|       2.5|     0|Samsung|
|      4| 37|      16.5|     4|  Apple|
|      5| 27|       9.0|     1|     MI|
+-------+---+----------+------+-------+
only showing top 5 rows

este metodo muestra las X primeras filas o registros del dataframe

In [54]:
#select only 2 columns
df.select('age','mobile').show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------+
|age| mobile|
+---+-------+
| 32|   Vivo|
| 27|  Apple|
| 22|Samsung|
| 37|  Apple|
| 27|     MI|
+---+-------+
only showing top 5 rows

Podemos tambien mostrar X filas pero solo con ciertos features

In [55]:
#info about dataframe
df.describe().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+------------------+------------------+------------------+------+
|summary|           ratings|               age|        experience|            family|mobile|
+-------+------------------+------------------+------------------+------------------+------+
|  count|                33|                33|                33|                33|    33|
|   mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.8181818181818181|  null|
| stddev|1.1188806636071336|  6.18527087180309| 6.770731351213326|1.8448330794164254|  null|
|    min|                 1|                22|               2.5|                 0| Apple|
|    max|                 5|                42|              23.0|                 5|  Vivo|
+-------+------------------+------------------+------------------+------------------+------+

Resumen grafico de los datos estadisticos mas importantes de la muestra

In [56]:
from pyspark.sql.types import StringType,DoubleType,IntegerType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Ahora tomaremos los tipos de datos de la libreria `pyspark,sql,types` estos los usaremos mas adelante con funciones

In [57]:
#with column
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---+----------+------+-------+----------------+
|ratings|age|experience|family|mobile |age_after_10_yrs|
+-------+---+----------+------+-------+----------------+
|3      |32 |9.0       |3     |Vivo   |42              |
|3      |27 |13.0      |3     |Apple  |37              |
|4      |22 |2.5       |0     |Samsung|32              |
|4      |37 |16.5      |4     |Apple  |47              |
|5      |27 |9.0       |1     |MI     |37              |
|4      |27 |9.0       |0     |Oppo   |37              |
|5      |37 |23.0      |5     |Vivo   |47              |
|5      |37 |23.0      |5     |Samsung|47              |
|3      |22 |2.5       |0     |Apple  |32              |
|3      |27 |6.0       |0     |MI     |37              |
+-------+---+----------+------+-------+----------------+
only showing top 10 rows

Esta es una operacion de transformacion que crea una nueva columna (derivado de una columna, de la nada o actualizando una). En este ccaso derviamos la edad nueva de la edad 'age'

In [58]:
df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---+----------+------+-------+----------+
|ratings|age|experience|family|mobile |age_double|
+-------+---+----------+------+-------+----------+
|3      |32 |9.0       |3     |Vivo   |32.0      |
|3      |27 |13.0      |3     |Apple  |27.0      |
|4      |22 |2.5       |0     |Samsung|22.0      |
|4      |37 |16.5      |4     |Apple  |37.0      |
|5      |27 |9.0       |1     |MI     |27.0      |
|4      |27 |9.0       |0     |Oppo   |27.0      |
|5      |37 |23.0      |5     |Vivo   |37.0      |
|5      |37 |23.0      |5     |Samsung|37.0      |
|3      |22 |2.5       |0     |Apple  |22.0      |
|3      |27 |6.0       |0     |MI     |27.0      |
+-------+---+----------+------+-------+----------+
only showing top 10 rows

Aqui lo mismo, transformamos la columna age pero aqui hacemos type casting a double

In [59]:
#with column
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---+----------+------+-------+----------------+
|ratings|age|experience|family|mobile |age_after_10_yrs|
+-------+---+----------+------+-------+----------------+
|3      |32 |9.0       |3     |Vivo   |42              |
|3      |27 |13.0      |3     |Apple  |37              |
|4      |22 |2.5       |0     |Samsung|32              |
|4      |37 |16.5      |4     |Apple  |47              |
|5      |27 |9.0       |1     |MI     |37              |
|4      |27 |9.0       |0     |Oppo   |37              |
|5      |37 |23.0      |5     |Vivo   |47              |
|5      |37 |23.0      |5     |Samsung|47              |
|3      |22 |2.5       |0     |Apple  |32              |
|3      |27 |6.0       |0     |MI     |37              |
+-------+---+----------+------+-------+----------------+
only showing top 10 rows

(Es la misma operacion vista antes)

In [60]:
#filter the records 
df.filter(df['mobile']=='Vivo').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
|      3| 32|       9.0|     3|  Vivo|
|      5| 37|      23.0|     5|  Vivo|
|      4| 37|       6.0|     0|  Vivo|
|      5| 37|      13.0|     1|  Vivo|
|      4| 37|       6.0|     0|  Vivo|
+-------+---+----------+------+------+

Operacion de filtro sobre los datos. En este caso solo registros de mobile de tipo "Vivo"

In [61]:
#filter the records 
df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------+------+
|age|ratings|mobile|
+---+-------+------+
| 32|      3|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
+---+-------+------+

Operacion de tipo filtro sobre mobile pero esta vez solo queremos mostrar ciertas columnas

In [62]:
#filter the multiple conditions
df.filter(df['mobile']=='Vivo').filter(df['experience'] >10).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
|      5| 37|      23.0|     5|  Vivo|
|      5| 37|      13.0|     1|  Vivo|
+-------+---+----------+------+------+

Filtros concatenados, en este caso filtro de mobile vivo Y que tenga una experiencia mayor a 10

In [63]:
#filter the multiple conditions
df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---+----------+------+------+
|ratings|age|experience|family|mobile|
+-------+---+----------+------+------+
|      5| 37|      23.0|     5|  Vivo|
|      5| 37|      13.0|     1|  Vivo|
+-------+---+----------+------+------+

OItra manera de concatenar. Estamos haciendo la interseccion del resultado de filtros separados (A diferencia de un filtro sobre el otro del anterior)

In [64]:
#Distinct Values in a column
df.select('mobile').distinct().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+
| mobile|
+-------+
|Samsung|
|     MI|
|   Oppo|
|  Apple|
|   Vivo|
+-------+

Solo mostramos los distintos valores de una columna

In [65]:
#distinct value count
df.select('mobile').distinct().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5

Cuantos valores distintos son en esa columna

In [66]:
df.groupBy('mobile').count().show(5,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+
|mobile |count|
+-------+-----+
|Samsung|6    |
|MI     |8    |
|Oppo   |7    |
|Apple  |7    |
|Vivo   |5    |
+-------+-----+

Cuantas registros hay por valor distinto de una columna de los datos. Como dice su nombre agrupamos bajo un criterio

In [67]:
# Value counts
df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+
|mobile |count|
+-------+-----+
|MI     |8    |
|Oppo   |7    |
|Apple  |7    |
|Samsung|6    |
|Vivo   |5    |
+-------+-----+

Mismo comando anterior pero ordenamos los datos bajo una columna y definimos si queremos que sean ascendentes o no

In [68]:
# Value counts
df.groupBy('mobile').mean().show(5,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+------------------+------------------+------------------+
|mobile |avg(ratings)      |avg(age)          |avg(experience)   |avg(family)       |
+-------+------------------+------------------+------------------+------------------+
|Samsung|4.166666666666667 |28.666666666666668|8.666666666666666 |1.8333333333333333|
|MI     |3.5               |30.125            |10.1875           |1.375             |
|Oppo   |2.857142857142857 |28.428571428571427|10.357142857142858|1.4285714285714286|
|Apple  |3.4285714285714284|30.571428571428573|11.0              |2.7142857142857144|
|Vivo   |4.2               |36.0              |11.4              |1.8               |
+-------+------------------+------------------+------------------+------------------+

Sacamos la media para todos los datos de cada columna que se agrupen bajo el criterio dado

In [69]:
df.groupBy('mobile').sum().show(5,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------+--------+---------------+-----------+
|mobile |sum(ratings)|sum(age)|sum(experience)|sum(family)|
+-------+------------+--------+---------------+-----------+
|Samsung|25          |172     |52.0           |11         |
|MI     |28          |241     |81.5           |11         |
|Oppo   |20          |199     |72.5           |10         |
|Apple  |24          |214     |77.0           |19         |
|Vivo   |21          |180     |57.0           |9          |
+-------+------------+--------+---------------+-----------+

Sacamos la suma para todos los datos de cada columna que se agrupen bajo el criterio dado

In [70]:
# Value counts
df.groupBy('mobile').max().show(5,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------+--------+---------------+-----------+
|mobile |max(ratings)|max(age)|max(experience)|max(family)|
+-------+------------+--------+---------------+-----------+
|Samsung|5           |37      |23.0           |5          |
|MI     |5           |42      |23.0           |5          |
|Oppo   |4           |42      |23.0           |2          |
|Apple  |4           |37      |16.5           |5          |
|Vivo   |5           |37      |23.0           |5          |
+-------+------------+--------+---------------+-----------+

Sacamos el maximo para todos los datos de cada columna que se agrupen bajo el criterio dado

In [71]:
# Value counts
df.groupBy('mobile').min().show(5,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------+--------+---------------+-----------+
|mobile |min(ratings)|min(age)|min(experience)|min(family)|
+-------+------------+--------+---------------+-----------+
|Samsung|2           |22      |2.5            |0          |
|MI     |1           |27      |2.5            |0          |
|Oppo   |2           |22      |6.0            |0          |
|Apple  |3           |22      |2.5            |0          |
|Vivo   |3           |32      |6.0            |0          |
+-------+------------+--------+---------------+-----------+

Sacamos el minimo para todos los datos de cada columna que se agrupen bajo el criterio dado

In [72]:
#Aggregation
df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------------+
|mobile |sum(experience)|
+-------+---------------+
|Samsung|52.0           |
|MI     |81.5           |
|Oppo   |72.5           |
|Apple  |77.0           |
|Vivo   |57.0           |
+-------+---------------+

La agregacion genera un dataframe el cual cuenta con operaciones sobre columnas anteriores. En este caso es un df que se agrrupa por mobile y genera una nueva columna que tiene la experiencia

In [73]:
# UDF
from pyspark.sql.functions import udf


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

importamos UDF, lo cual nos permite definir funciones para aplicar en df

In [74]:
#normal function 
def price_range(brand):
    if brand in ['Samsung','Apple']:
        return 'High Price'
    elif brand =='MI':
        return 'Mid Price'
    else:
        return 'Low Price'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Esta es una funcion de python normal, pero lo que busca es operar soibre la columna "brand" y dar una respuesta segun logica

In [75]:
#create udf using python function
brand_udf=udf(price_range,StringType())
#apply udf on dataframe
df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---+----------+------+-------+-----------+
|ratings|age|experience|family|mobile |price_range|
+-------+---+----------+------+-------+-----------+
|3      |32 |9.0       |3     |Vivo   |Low Price  |
|3      |27 |13.0      |3     |Apple  |High Price |
|4      |22 |2.5       |0     |Samsung|High Price |
|4      |37 |16.5      |4     |Apple  |High Price |
|5      |27 |9.0       |1     |MI     |Mid Price  |
|4      |27 |9.0       |0     |Oppo   |Low Price  |
|5      |37 |23.0      |5     |Vivo   |Low Price  |
|5      |37 |23.0      |5     |Samsung|High Price |
|3      |22 |2.5       |0     |Apple  |High Price |
|3      |27 |6.0       |0     |MI     |Mid Price  |
+-------+---+----------+------+-------+-----------+
only showing top 10 rows

Con UDF podemos aplicar la funcion anterior a los datos y presentamos la nueva columna del rango que muestra el retorno de la funcion. En este caso es un clasificador de niveles de precio

In [76]:
#using lambda function
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
#apply udf on dataframe
df.withColumn("age_group", age_udf(df.age)).show(10,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---+----------+------+-------+---------+
|ratings|age|experience|family|mobile |age_group|
+-------+---+----------+------+-------+---------+
|3      |32 |9.0       |3     |Vivo   |senior   |
|3      |27 |13.0      |3     |Apple  |young    |
|4      |22 |2.5       |0     |Samsung|young    |
|4      |37 |16.5      |4     |Apple  |senior   |
|5      |27 |9.0       |1     |MI     |young    |
|4      |27 |9.0       |0     |Oppo   |young    |
|5      |37 |23.0      |5     |Vivo   |senior   |
|5      |37 |23.0      |5     |Samsung|senior   |
|3      |22 |2.5       |0     |Apple  |young    |
|3      |27 |6.0       |0     |MI     |young    |
+-------+---+----------+------+-------+---------+
only showing top 10 rows

Tambien podemos aplicar funciones lambda, en este caso un clasificador de edades que agregamos con WithColumn sobre la columna age

In [77]:
#pandas udf
from pyspark.sql.functions import pandas_udf, PandasUDFType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Ahora importamos una funcion UDF de pandas junto a su tipo

In [78]:
#create python function
def remaining_yrs(age):
    yrs_left=100-age

    return yrs_left

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Creamos una funcion de modificacion, en este caso dada una edad realizamos 100 - la edad encontrando los anios restantes

In [105]:

#create udf using python function
length_udf = pandas_udf(remaining_yrs, IntegerType())
#apply pandas udf on dataframe
df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Pandas >= 0.23.2 must be installed; however, it was not found.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py", line 331, in pandas_udf
    require_minimum_pandas_version()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/utils.py", line 33, in require_minimum_pandas_version
    "it was not found." % minimum_pandas_version)
ImportError: Pandas >= 0.23.2 must be installed; however, it was not found.



Como no tengo pandas >= 0.23.2 no puedo correrlo.

Sin embargo podemos crear una funcion llamada `length_udf` la cual es generada con la funcion `pandas_udf`, especificando la funcion de python base y el tipo de dato de resultado (En este caso un entero)

In [80]:
#udf using two columns 
def prod(rating,exp):
    x=rating*exp
    return x

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Ahora una funcion que usa dos columnas. Propiamtene realizamos su producto y devolvemos la columna resultado

In [81]:
#create udf using python function
prod_udf = pandas_udf(prod, DoubleType())
#apply pandas udf on multiple columns of dataframe
df.withColumn("product", prod_udf(df['ratings'],df['experience'])).show(10,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Pandas >= 0.23.2 must be installed; however, it was not found.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py", line 331, in pandas_udf
    require_minimum_pandas_version()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/utils.py", line 33, in require_minimum_pandas_version
    "it was not found." % minimum_pandas_version)
ImportError: Pandas >= 0.23.2 must be installed; however, it was not found.



De nuevo por cuestion de la version no puedo correr

Creamos una funcion `prod_udf` que nace con la funcion prod y genera tipos Double.

Por ultimo presentamos el df con la nueva columna generada por la multiplicacion de `ratings` y `experience`

In [82]:
#duplicate values
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

33

Esta cuenta todos los valores, hasta duplicados

In [83]:
#drop duplicate values
df=df.dropDuplicates()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Eliminamos los registros duplicados o repetidos

In [84]:
#validate new count
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

26

Volvemos a contar para validar cuales no se repiten

In [85]:
#drop column of dataframe
df_new=df.drop('mobile')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Con esta funcion eliminamos una columna. En este caso la columna `mobile`

In [86]:
df_new.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---+----------+------+
|ratings|age|experience|family|
+-------+---+----------+------+
|      4| 22|       2.5|     0|
|      4| 22|       6.0|     1|
|      3| 27|       6.0|     0|
|      2| 32|      16.5|     2|
|      4| 27|       9.0|     0|
|      3| 37|      16.5|     5|
|      4| 27|       6.0|     1|
|      4| 37|       9.0|     2|
|      3| 22|       2.5|     0|
|      3| 32|       9.0|     3|
+-------+---+----------+------+
only showing top 10 rows

Presentamos el df con las modficaciones de repetidos y eliminando columna

In [87]:
# saving file (csv)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [106]:
#current working directory
pwd

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
name 'pwd' is not defined
Traceback (most recent call last):
NameError: name 'pwd' is not defined



Comando para ver el directorio actual para unix (Y en general, bash y eso...)

In [110]:
#target directory 
write_uri='s3://reto5-notebooks/datasets-reto5-3-2/data_processing/df_csv'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Guardamos el URI del bucket donde vamos a guardar el df

In [111]:
#save the dataframe as single csv 
df.coalesce(1).write.format("csv").option("header","true").save(write_uri)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Aqui concertamos el df en formato CSV incluyendo los headers para guardar en la direccion anterior

In [91]:
# parquet

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [112]:
#target location
parquet_uri='s3://reto5-notebooks/datasets-reto5-3-2/data_processing/df_parquet'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Declaramos el uri para guardar en formato parquet. Este formato por como esta hecho es mas eficiente para almacenar y leer estos DF

In [113]:
#save the data into parquet format 
df.write.format('parquet').save(parquet_uri)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Como ultimo, guardamos el DF en formato parquet en el uri de s3