# Instalación de los paquetes Java y Pyspark

In [2]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 65 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 58.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=2070006d4ef2117c794999a92fcb6590116ad0d9aff42837bfd51161b918acb2
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhe

## Importación de las librerías

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

In [4]:
# Creamos la configuración de sesión
conf = SparkConf().set("spark.ui.port", "4050")

# creación del SparkContext
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder \
        .master("local") \
        .appName("Google colab - The Bridge") \
        .config("spark.ui.port", "4050") \
        .getOrCreate()

In [25]:
# Para monitorizar al puerto 4050 de spark
#!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

Archive:  ngrok-stable-linux-amd64.zip
replace ngrok? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: ngrok                   
{"tunnels":[{"name":"command_line (http)","uri":"/api/tunnels/command_line%20%28http%29","public_url":"http://b7ea-35-245-31-45.ngrok.io","proto":"http","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}},{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://b7ea-35-245-31-45.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}


## Importamos nuestro dataset

Conviene leer la documentación de `spark.read` https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html

In [28]:
!wget https://github.com/marcusRB/The_Bridge_School_DataScience_PT/blob/main/02_DataAnalysis/dataset/train_oSwQCTC.zip

--2021-10-02 08:14:37--  https://github.com/marcusRB/The_Bridge_School_DataScience_PT/blob/main/02_DataAnalysis/dataset/train_oSwQCTC.zip
Resolving github.com (github.com)... 140.82.112.3
Connecting to github.com (github.com)|140.82.112.3|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘train_oSwQCTC.zip’

train_oSwQCTC.zip       [<=>                 ]       0  --.-KB/s               train_oSwQCTC.zip       [ <=>                ] 129.75K  --.-KB/s    in 0.007s  

2021-10-02 08:14:37 (18.7 MB/s) - ‘train_oSwQCTC.zip’ saved [132859]



In [10]:
!unzip /content/train_oSwQCTC.zip

Archive:  /content/train_oSwQCTC.zip
  inflating: train.csv               


## Funciones básicas de SparkSQL - Dataframe

https://spark.apache.org/docs/latest/sql-programming-guide.html

In [11]:
# Creamos nuestro dataframe
df = spark.read.csv("/content/train.csv", header=True, inferSchema=True)

In [29]:
# Comprobar el tipo de objeto
type(df)

pyspark.sql.dataframe.DataFrame

In [31]:
df_pandas = pd.read_csv("/content/train.csv")
type(df_pandas)

pandas.core.frame.DataFrame

In [32]:
df_pandas.head()

Unnamed: 0,User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase
0,1000001,P00069042,F,0-17,10,A,2,0,3,,,8370
1,1000001,P00248942,F,0-17,10,A,2,0,1,6.0,14.0,15200
2,1000001,P00087842,F,0-17,10,A,2,0,12,,,1422
3,1000001,P00085442,F,0-17,10,A,2,0,12,14.0,,1057
4,1000002,P00285442,M,55+,16,C,4+,0,8,,,7969


In [33]:
df.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|                12|              null|              null|    1422

In [12]:
# Para poder observar el esquema de spark con printSchema()
df.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [13]:
# muestra los datos con datraframe.show()
df.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|                12|              null|              null|    1422

In [14]:
df.show(5, True, True) # para leerlo en vertical 

-RECORD 0-------------------------------
 User_ID                    | 1000001   
 Product_ID                 | P00069042 
 Gender                     | F         
 Age                        | 0-17      
 Occupation                 | 10        
 City_Category              | A         
 Stay_In_Current_City_Years | 2         
 Marital_Status             | 0         
 Product_Category_1         | 3         
 Product_Category_2         | null      
 Product_Category_3         | null      
 Purchase                   | 8370      
-RECORD 1-------------------------------
 User_ID                    | 1000001   
 Product_ID                 | P00248942 
 Gender                     | F         
 Age                        | 0-17      
 Occupation                 | 10        
 City_Category              | A         
 Stay_In_Current_City_Years | 2         
 Marital_Status             | 0         
 Product_Category_1         | 1         
 Product_Category_2         | 6         
 Product_Categor

In [15]:
df.show(5, False, False)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age |Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001|P00069042 |F     |0-17|10        |A            |2                         |0             |3                 |null              |null              |8370    |
|1000001|P00248942 |F     |0-17|10        |A            |2                         |0             |1                 |6                 |14                |15200   |
|1000001|P00087842 |F     |0-17|10        |A            |2                         |0             |12                |null              |null              |1422    |
|100

In [34]:
df.head(5)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),
 Row(User_ID=1000001, Product_ID='P00087842', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422),
 Row(User_ID=1000001, Product_ID='P00085442', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057),
 Row(User_ID=1000002, Product_ID='P0

In [35]:
type(df)

pyspark.sql.dataframe.DataFrame

In [17]:
df.count()

550068

In [18]:
# visualizar algunas de las columnas
subset_1 = df \
  .select("User_ID", "Gender", "Age", "Occupation")

In [19]:
# Observamos el tipo de objeto creado
type(subset_1)

pyspark.sql.dataframe.DataFrame

In [20]:
# en este caso mostramos las 10 primeras filas
subset_1.show(10)

+-------+------+-----+----------+
|User_ID|Gender|  Age|Occupation|
+-------+------+-----+----------+
|1000001|     F| 0-17|        10|
|1000001|     F| 0-17|        10|
|1000001|     F| 0-17|        10|
|1000001|     F| 0-17|        10|
|1000002|     M|  55+|        16|
|1000003|     M|26-35|        15|
|1000004|     M|46-50|         7|
|1000004|     M|46-50|         7|
|1000004|     M|46-50|         7|
|1000005|     M|26-35|        20|
+-------+------+-----+----------+
only showing top 10 rows



In [21]:
# describimos las columnas
df \
  .describe() \
  .show(10)

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|           550068|       550068|                    550068|             550068|            550068|            376430|            166821|           550068|
|   mean|1003028.8424013031|      null|  null|  null|8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 9.84232925112238

### Funciones en Spark
https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html?highlight=sparksession#module-pyspark.sql.functions

In [36]:
# Importamos las funciones de pyspark
from pyspark.sql import functions as F

In [41]:
# Realizamos una agregación con groupBy con la función de agregación agg() y sum()
df \
  .groupBy("City_Category") \
  .agg(F.sum("Purchase")) \
  .show(5)

+-------------+-------------+
|City_Category|sum(Purchase)|
+-------------+-------------+
|            B|   2115533605|
|            C|   1663807476|
|            A|   1316471661|
+-------------+-------------+



In [42]:
# lo mismo anterior solamente con la función nativa de Python sum()
df \
  .groupBy("City_Category") \
  .agg(sum("Purchase")) \
  .show(5)

+-------------+-------------+
|City_Category|sum(Purchase)|
+-------------+-------------+
|            B|   2115533605|
|            C|   1663807476|
|            A|   1316471661|
+-------------+-------------+



In [46]:
# Operación de conteo y eliminación de valores nulos
df \
  .select([F.count(F.when(F.isnull(c), c)) \
           .alias(c) for c in df.columns]) \
  .show(5, False, True)

-RECORD 0----------------------------
 User_ID                    | 0      
 Product_ID                 | 0      
 Gender                     | 0      
 Age                        | 0      
 Occupation                 | 0      
 City_Category              | 0      
 Stay_In_Current_City_Years | 0      
 Marital_Status             | 0      
 Product_Category_1         | 0      
 Product_Category_2         | 173638 
 Product_Category_3         | 383247 
 Purchase                   | 0      



In [50]:
# utilizamos fillna()
df = df \
        .fillna({'Product_Category_2':0,
                 'Product_Category_3':0})

In [52]:
# verificando los cambios efectuados
df \
  .select([F.count(F.when(F.isnull(c), c)) \
           .alias(c) for c in df.columns]) \
  .show(5, False, True)

-RECORD 0-------------------------
 User_ID                    | 0   
 Product_ID                 | 0   
 Gender                     | 0   
 Age                        | 0   
 Occupation                 | 0   
 City_Category              | 0   
 Stay_In_Current_City_Years | 0   
 Marital_Status             | 0   
 Product_Category_1         | 0   
 Product_Category_2         | 0   
 Product_Category_3         | 0   
 Purchase                   | 0   



## Manipulación de ficheros en Spark

In [56]:
# Realizando una escritura en nuestra carpeta content
df \
  .write.csv(
      "/content/preprocessed_data"
  )

In [57]:
df.write.csv("/content/drive/My Drive/preprocessed_data")

In [58]:
df.rdd.getNumPartitions() # comprobación de porque tenemos dos ficheros particionados en nuestra carpeta 
# content, el número de particiones configurado en SparkConf es 2.

2

In [59]:
# En pandas realizamos la escritura en un solo fichero
df_pd = df.toPandas()
df_pd.to_csv("pandas_preprocessed_data.csv")

In [60]:
# Escritura en format parquet
df.write.parquet("/content/parquet_preprocessed_data")

In [64]:
pq = spark.read.parquet("/content/parquet_preprocessed_data/")
pq.show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                 0|                 0|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|                 0|                 0|    1422|
|100

In [70]:
df_orc = df.write.orc("/content/orc_preprocessed_data/",mode="append")

In [67]:
spark.read.orc("/content/orc_preprocessed_data/").show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                 0|                 0|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|                 0|                 0|    1422|
|100

## Utilizando la sintaxis de SQL

In [71]:
# Crear tablas temporaneas
df.createOrReplaceTempView("tabla_temporanea")

In [73]:
spark.sql("""
  SELECT count(*) FROM tabla_temporanea
  """
  ).show(5)

+--------+
|count(1)|
+--------+
|  550068|
+--------+



In [79]:
spark.sql("""
SELECT * FROM tabla_temporanea LIMIT 5;
""").show(5, False, True)

-RECORD 0-------------------------------
 User_ID                    | 1000001   
 Product_ID                 | P00069042 
 Gender                     | F         
 Age                        | 0-17      
 Occupation                 | 10        
 City_Category              | A         
 Stay_In_Current_City_Years | 2         
 Marital_Status             | 0         
 Product_Category_1         | 3         
 Product_Category_2         | 0         
 Product_Category_3         | 0         
 Purchase                   | 8370      
-RECORD 1-------------------------------
 User_ID                    | 1000001   
 Product_ID                 | P00248942 
 Gender                     | F         
 Age                        | 0-17      
 Occupation                 | 10        
 City_Category              | A         
 Stay_In_Current_City_Years | 2         
 Marital_Status             | 0         
 Product_Category_1         | 1         
 Product_Category_2         | 6         
 Product_Categor

In [84]:
df_sql = spark.sql("""
SELECT Occupation, City_Category, Marital_Status FROM tabla_temporanea
""")

In [85]:
df_sql.show(5)

+----------+-------------+--------------+
|Occupation|City_Category|Marital_Status|
+----------+-------------+--------------+
|        10|            A|             0|
|        10|            A|             0|
|        10|            A|             0|
|        10|            A|             0|
|        16|            C|             0|
+----------+-------------+--------------+
only showing top 5 rows



In [86]:
type(df_sql)

pyspark.sql.dataframe.DataFrame

In [81]:
spark.sql("SHOW TABLES;").show()

+--------+----------------+-----------+
|database|       tableName|isTemporary|
+--------+----------------+-----------+
|        |tabla_temporanea|       true|
+--------+----------------+-----------+



In [82]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [88]:
spark.sql("DESCRIBE tabla_temporanea").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             User_ID|      int|   null|
|          Product_ID|   string|   null|
|              Gender|   string|   null|
|                 Age|   string|   null|
|          Occupation|      int|   null|
|       City_Category|   string|   null|
|Stay_In_Current_C...|   string|   null|
|      Marital_Status|      int|   null|
|  Product_Category_1|      int|   null|
|  Product_Category_2|      int|   null|
|  Product_Category_3|      int|   null|
|            Purchase|      int|   null|
+--------------------+---------+-------+



In [92]:
spark.sql("DESCRIBE TABLE EXTENDED tabla_temporanea").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             User_ID|      int|   null|
|          Product_ID|   string|   null|
|              Gender|   string|   null|
|                 Age|   string|   null|
|          Occupation|      int|   null|
|       City_Category|   string|   null|
|Stay_In_Current_C...|   string|   null|
|      Marital_Status|      int|   null|
|  Product_Category_1|      int|   null|
|  Product_Category_2|      int|   null|
|  Product_Category_3|      int|   null|
|            Purchase|      int|   null|
+--------------------+---------+-------+

