# **PySpark**: The Apache Spark Python API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Python API.

## 2. The Spark Cluster

### 2.1. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

22/09/19 21:03:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [8]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

In [114]:
venta = spark.read.option("compression.codec", "snappy").option("mergeSchema", "true").parquet("data2/venta")

In [115]:
venta.count()

46645

In [116]:
len(venta.columns)

10

In [117]:
venta.printSchema()

root
 |-- idventa: integer (nullable = true)
 |-- fecha: date (nullable = true)
 |-- fecha_entrega: date (nullable = true)
 |-- idcanal: integer (nullable = true)
 |-- idcliente: integer (nullable = true)
 |-- idsucursal: integer (nullable = true)
 |-- idempleado: integer (nullable = true)
 |-- idproducto: integer (nullable = true)
 |-- precio: float (nullable = true)
 |-- cantidad: integer (nullable = true)



In [118]:
venta.show(n=10)

+-------+----------+-------------+-------+---------+----------+----------+----------+------+--------+
|idventa|     fecha|fecha_entrega|idcanal|idcliente|idsucursal|idempleado|idproducto|precio|cantidad|
+-------+----------+-------------+-------+---------+----------+----------+----------+------+--------+
|      1|2018-03-09|   2018-03-17|      3|      969|        13|      1674|     42817|813.12|       2|
|      2|2018-12-28|   2018-12-29|      2|      884|        13|      1674|     42795|543.18|       3|
|      3|2016-03-28|   2016-03-31|      2|     1722|        13|      1674|     42837|430.32|       1|
|      4|2017-10-23|   2017-10-24|      3|     2876|        13|      1674|     42834|818.84|       2|
|      5|2017-11-22|   2017-11-25|      2|      678|        13|      1674|     42825|554.18|       3|
|      6|2018-01-24|   2018-01-25|      2|     3263|        13|      1674|     42852| 152.0|       1|
|      7|2015-03-25|   2015-03-26|      3|     2983|        13|      1674|     429

In [119]:
# calculate statistics
ventas_out = venta.na.drop(subset=['precio','cantidad']).groupBy("idproducto").agg(mean(venta.precio).alias("promedio"), stddev(venta.precio).alias("stddev"))

In [120]:
ventas_out = ventas_out.withColumn("PrecioMaximo", ventas_out.promedio + ventas_out.stddev * 3).withColumn("PrecioMinimo", ventas_out.promedio - ventas_out.stddev * 3)

In [121]:
ventas_out.show(n=10)

+----------+------------------+------------------+------------------+-------------------+
|idproducto|          promedio|            stddev|      PrecioMaximo|       PrecioMinimo|
+----------+------------------+------------------+------------------+-------------------+
|     42834|1772.5477912454044| 8766.698479843624|28072.643230776277| -24527.54764828547|
|     43010| 579.5276381909548|2715.9365607484365| 8727.337320436265| -7568.282044054355|
|     42926| 173.2289156626506| 855.8609108933418| 2740.811648342676|-2394.3538170173747|
|     42754| 3399.485294117647|16178.246644473787|51934.225227539006| -45135.25463930372|
|     42942| 9817.857142857143| 43428.96544353547|140104.75347346353|-120469.03918774925|
|     42817|2294.6944737346626|10853.233513624373|34854.395014607784|-30265.006067138456|
|     43007| 883.8402366863905|  4370.23553541889| 13994.54684294306| -12226.86636957028|
|     42988|2085.4874371859296| 9288.874866279935|29952.112036025737|-25781.137161653878|
|     4278

In [122]:
ventas_out.write.option("compression.codec", "snappy").option("mergeSchema", "true").parquet("data2/venta_criterio_outliers", mode="overwrite")

                                                                                

In [123]:
venta = venta.alias("v").join(ventas_out.alias("o"), venta['idproducto'] == ventas_out['idproducto']).select("v.idventa","v.fecha","v.fecha_entrega","v.idcanal","v.idcliente","v.idsucursal","v.idempleado","v.idproducto","v.precio","v.cantidad","o.promedio","o.stddev","o.PrecioMaximo","o.PrecioMinimo")

In [124]:
venta.printSchema()

root
 |-- idventa: integer (nullable = true)
 |-- fecha: date (nullable = true)
 |-- fecha_entrega: date (nullable = true)
 |-- idcanal: integer (nullable = true)
 |-- idcliente: integer (nullable = true)
 |-- idsucursal: integer (nullable = true)
 |-- idempleado: integer (nullable = true)
 |-- idproducto: integer (nullable = true)
 |-- precio: float (nullable = true)
 |-- cantidad: integer (nullable = true)
 |-- promedio: double (nullable = true)
 |-- stddev: double (nullable = true)
 |-- PrecioMaximo: double (nullable = true)
 |-- PrecioMinimo: double (nullable = true)



In [125]:
venta.withColumn("PrecioMaximo",col("PrecioMaximo").cast("float"))

DataFrame[idventa: int, fecha: date, fecha_entrega: date, idcanal: int, idcliente: int, idsucursal: int, idempleado: int, idproducto: int, precio: float, cantidad: int, promedio: double, stddev: double, PrecioMaximo: float, PrecioMinimo: double]

In [126]:
venta.withColumn("PrecioMinimo",col("PrecioMinimo").cast("float"))

DataFrame[idventa: int, fecha: date, fecha_entrega: date, idcanal: int, idcliente: int, idsucursal: int, idempleado: int, idproducto: int, precio: float, cantidad: int, promedio: double, stddev: double, PrecioMaximo: double, PrecioMinimo: float]

In [127]:
def detecta_outlier(valor, maximo, minimo):
    return (valor < minimo) or (valor > maximo)

In [128]:
udf_detecta_outlier = udf(lambda valor, MaxLimit, MinLimit: detecta_outlier(valor, MaxLimit, MinLimit), BooleanType())

In [129]:
venta.count()

                                                                                

46645

In [130]:
venta = venta.na.drop(subset=['precio','cantidad'])

In [131]:
venta.count()

                                                                                

44845

In [132]:
venta = venta.withColumn("esOutlier", udf_detecta_outlier(venta.precio, venta.PrecioMaximo, venta.PrecioMinimo)).filter("NOT esOutlier")

In [133]:
venta.show(n=10)

                                                                                

+-------+----------+-------------+-------+---------+----------+----------+----------+------+--------+------------------+------------------+------------------+-------------------+---------+
|idventa|     fecha|fecha_entrega|idcanal|idcliente|idsucursal|idempleado|idproducto|precio|cantidad|          promedio|            stddev|      PrecioMaximo|       PrecioMinimo|esOutlier|
+-------+----------+-------------+-------+---------+----------+----------+----------+------+--------+------------------+------------------+------------------+-------------------+---------+
|      1|2018-03-09|   2018-03-17|      3|      969|        13|      1674|     42817|813.12|       2|2294.6944737346626|10853.233513624373|34854.395014607784|-30265.006067138456|    false|
|      2|2018-12-28|   2018-12-29|      2|      884|        13|      1674|     42795|543.18|       3| 2491.543036419412|10085.326471187416| 32747.52244998166| -27764.43637714284|    false|
|      3|2016-03-28|   2016-03-31|      2|     1722|   

In [134]:
venta = venta.select(["idventa","fecha","fecha_entrega","idcanal","idcliente","idsucursal","idempleado","idproducto","precio","cantidad"])

In [135]:
venta.write.option("compression.codec", "snappy").option("mergeSchema", "true").parquet("data2/venta_sin_outliers", mode="overwrite")

                                                                                