# Practice of Pyspark & Databricks

In [0]:
# Leer CSV de ejemplo como DataFrame
diamonds = sqlContext.read.format('com.databricks.spark.csv').options(
header='true', inferSchema='true').load('/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv')

In [0]:
# Agregue una nueva celda y visualice el contenido del csv:
display(diamonds)

_c0,carat,cut,color,clarity,depth,table,price,x,y,z
1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
6,0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48
7,0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47
8,0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53
9,0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49
10,0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39


In [0]:
# Mostrar el esquema del DataFrame:
diamonds.printSchema()

In [0]:
# Contar el número de filas en el conjunto de datos:
print(diamonds.count())

In [0]:
# Ver los distintos colores de diamantes en el conjunto de datos:
display(diamonds.select('color').distinct().collect())

color
F
E
D
J
G
I
H


In [0]:
# Crear un nuevo DataFrame con la columna price de tipo Double:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import *
diamondsCast = diamonds.withColumn("price", diamonds["price"].cast(DoubleType()))

In [0]:
# Calcular el precio promedio por valor carat:
carat_avgPrice = (diamondsCast
               .groupBy("carat")
               .avg("price")
               .withColumnRenamed("avg(price)", "avgPrice")
               .orderBy(desc("avgPrice")))

In [0]:
# Ver el top-10 de los precios promedio más altos para el valor carat:
carat_avgPrice.show(10)

In [0]:
# Para realizar visualizaciones rápidas sobre los datos, puede utilizar las opciones proporcionadas por el notebook:
display(diamonds)

_c0,carat,cut,color,clarity,depth,table,price,x,y,z
1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
6,0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48
7,0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47
8,0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53
9,0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49
10,0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39


In [0]:
# Manipulación de datos con RDD
# Se puede obtener un RDD directamente del DataFrame.
diamonds_rdd = diamonds.rdd

In [0]:
# Ver los primeros tres elementos del RDD:
diamonds_rdd.take(3)

In [0]:
# Contar diamantes por cut:
countByGroup = diamonds_rdd.map(lambda x: (x.cut, 1)).reduceByKey(lambda x,y: x+y)
display(countByGroup.collect())

_1,_2
Ideal,21551
Premium,13791
Good,4906
Very Good,12082
Fair,1610


In [0]:
# Distintos tipos de clarity para los diamantes en el conjunto de datos:
distinctClarity = diamonds_rdd.map(lambda x: x.clarity).distinct()
distinctClarity.collect()

In [0]:
# Precio promedio de los diamantes por cut:
avgPrice = diamonds_rdd.map(lambda x: (x.cut, float(x.price))).reduceByKey(lambda x,y: (x+y)/2)
display(avgPrice.collect())

_1,_2
Ideal,2756.7240663718817
Premium,2756.654813661215
Good,2755.647409027791
Very Good,2756.7183661747795
Fair,2743.567771968392


In [0]:
%fs rm -r /FileStore/carat_avgPrice

In [0]:
# Guardar la consulta al archivo CSV en la ruta FileStore. Cada partición será guardada en un archivo individual. Colocando repartition(1) se creará un único archivo de salida:

(carat_avgPrice.repartition(1)
            .write
            .format('com.databricks.spark.csv')
            .options(header='true'
).save('/FileStore/carat_avgPrice'))

In [0]:
%fs rm -r /FileStore/cutPrice

In [0]:
# Seleccionar las columnas cut y price
(diamonds.repartition(1)
  .select('cut', 'price')
  .write
  .format('com.databricks.spark.csv')
  .options(header='true'
).save('/FileStore/cutPrice'))

In [0]:
# Podrá acceder a los archivos en las rutas:

# /FileStore/carat_avgPrice
# /FileStore/cutPrice

In [0]:
# Leer y mostrar el CSV guardado anteriormente. Los puede descargar dando click en Imgur:
cutPrice = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferSchema='true').load('/FileStore/cutPrice')
display(cutPrice)

cut,price
Ideal,326
Premium,326
Good,327
Premium,334
Good,335
Very Good,336
Very Good,336
Very Good,337
Fair,337
Very Good,338
