<h2> Pyspark - First Touch

This notebook shows the first touch with Pyspark and some basic & fundamental commands.

What is Pyspark? 
PySpark is the Python API for Spark.

Prerequisites for run the following code:

Installed & Configured: Spark + Pyspark + Python

For the configuration, I suggest this source: https://docs.anaconda.com/anaconda-scale/howto/spark-configuration/

Author: Luciano Nieto    |     Date: 05/06/20

In [1]:
import pyspark
from pyspark.sql import SparkSession

# The first step is create a Spark Session, where its possible to configure the cluster nodes, 
# and the memory allocated to each one.

# Init your spark session:

spark = SparkSession.builder \
   .master("local") \
   .appName("My First App") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [2]:
# RDD: Resilient Distributed Datasets, Spark revolves around the concept of a resilient distributed dataset (RDD), 
# which is a fault-tolerant collection of elements that can be operated on in parallel.. 
# There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing 
# a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source 
# offering a Hadoop InputFormat. Source.: https://spark.apache.org/docs/latest/rdd-programming-guide.html

# 2 ways to create the RDD:
# Parallelize: From Collection
# TextFile: From External Data


# From Collection! 
rdd = sc.parallelize([10,11,12,13,14],2)

# Visualize the content of RDD:
rdd.take(2)

[10, 11]

In [3]:
#RDD from External Data !

#a) Lets first take the dataset: source: https://www.kaggle.com/blastchar/telco-customer-churn/data
#b) Read the Data from CSV to RDD:

file = 'data/WA_Fn-UseC_-Telco-Customer-Churn.csv' #<file location + filename>

rdd = sc.textFile(file)

#c) See the content of the RDD - first 3 elements:
rdd.take(3)

['customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn',
 '7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,Yes,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No',
 '5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No']

_________________________________________________________________________________________________________


<h4> RDD Transformation: produces a new rdd with the applied transformation. </h4>

  Narrow Transformation: Calculate all the elements at the same partition.
  Commands: Map,FlatMap,MapPartition,Filter,Sample,Union...

  Wide Transformation: Calculate all the elements in a single partition or may live in anothers.
  Commands: Intersection,Distinct,ReduceByKey,GroupByKey,Join,Cartesian,Repartition,Coalesce..
  
  
  
<h4> RDD Action: work with the actual dataset operations. </h4>
  Commands: Reduce, Collect, Count, First, Take, CountByKey...
  
More: https://spark.apache.org/docs/latest/rdd-programming-guide.html


_________________________________________________________________________________________________________


In [4]:
# RDD action:
# First element

rdd.first()

'customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn'

In [5]:
# Length of the first 3 elements:

rdd.map(lambda s: len(s)).take(3)

[259, 122, 98]

In [6]:
# Split lines by comma. When the RDD is created, all the lines are inside a string.
# With the command Map, we can separate each field by a comma, for instance.

rdd_splited = rdd.map(lambda line: line.split(","))
rdd_splited.take(2)

[['customerID',
  'gender',
  'SeniorCitizen',
  'Partner',
  'Dependents',
  'tenure',
  'PhoneService',
  'MultipleLines',
  'InternetService',
  'OnlineSecurity',
  'OnlineBackup',
  'DeviceProtection',
  'TechSupport',
  'StreamingTV',
  'StreamingMovies',
  'Contract',
  'PaperlessBilling',
  'PaymentMethod',
  'MonthlyCharges',
  'TotalCharges',
  'Churn'],
 ['7590-VHVEG',
  'Female',
  '0',
  'Yes',
  'No',
  '1',
  'No',
  'No phone service',
  'DSL',
  'No',
  'Yes',
  'No',
  'No',
  'No',
  'No',
  'Month-to-month',
  'Yes',
  'Electronic check',
  '29.85',
  '29.85',
  'No']]

In [7]:
#Filter: filter the rdd based in a condition:

rdd_churns = rdd_splited.filter(lambda x: x[20] == "Yes")
rdd_churns.take(3)

[['3668-QPYBK',
  'Male',
  '0',
  'No',
  'No',
  '2',
  'Yes',
  'No',
  'DSL',
  'Yes',
  'Yes',
  'No',
  'No',
  'No',
  'No',
  'Month-to-month',
  'Yes',
  'Mailed check',
  '53.85',
  '108.15',
  'Yes'],
 ['9237-HQITU',
  'Female',
  '0',
  'No',
  'No',
  '2',
  'Yes',
  'No',
  'Fiber optic',
  'No',
  'No',
  'No',
  'No',
  'No',
  'No',
  'Month-to-month',
  'Yes',
  'Electronic check',
  '70.7',
  '151.65',
  'Yes'],
 ['9305-CDSKC',
  'Female',
  '0',
  'No',
  'No',
  '8',
  'Yes',
  'Yes',
  'Fiber optic',
  'No',
  'No',
  'Yes',
  'No',
  'Yes',
  'Yes',
  'Month-to-month',
  'Yes',
  'Electronic check',
  '99.65',
  '820.5',
  'Yes']]

In [8]:
#Filter: "in" line:

rdd_fiber = rdd.filter(lambda x: "Fiber optic" in x)
rdd_fiber.take(3)

['9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,No,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes',
 '9305-CDSKC,Female,0,No,No,8,Yes,Yes,Fiber optic,No,No,Yes,No,Yes,Yes,Month-to-month,Yes,Electronic check,99.65,820.5,Yes',
 '1452-KIOVK,Male,0,No,Yes,22,Yes,Yes,Fiber optic,No,Yes,No,No,Yes,No,Month-to-month,Yes,Credit card (automatic),89.1,1949.4,No']

In [9]:
# sort the rdd by the Total Changes columns - desccending:

rdd_sorted = rdd.sortBy(lambda line: line[19],ascending = False)
rdd_sorted.take(2)

['5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,No,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No',
 '3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,Yes,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes']

In [10]:
# Key Value pairs: RDD: to perform aggregations, and transform the data.


rdd_2 = rdd_splited.map(lambda line: (line[0:17],line[18:20]))
rdd_2.take(2)

[(['customerID',
   'gender',
   'SeniorCitizen',
   'Partner',
   'Dependents',
   'tenure',
   'PhoneService',
   'MultipleLines',
   'InternetService',
   'OnlineSecurity',
   'OnlineBackup',
   'DeviceProtection',
   'TechSupport',
   'StreamingTV',
   'StreamingMovies',
   'Contract',
   'PaperlessBilling'],
  ['MonthlyCharges', 'TotalCharges']),
 (['7590-VHVEG',
   'Female',
   '0',
   'Yes',
   'No',
   '1',
   'No',
   'No phone service',
   'DSL',
   'No',
   'Yes',
   'No',
   'No',
   'No',
   'No',
   'Month-to-month',
   'Yes'],
  ['29.85', '29.85'])]

<h3> RDD x DataFrame x DataSets

RDD: Primary user-facing API in Spark, since the beggining. About 2011.

DataFrame: Distribute collection of Row objects, UDFs, logical plan optimizer. Spark 2.0. 2015.

Dataset: Starting in Spark 2.0. Strongly-typed API & performed. 2016 (Scala & Java, only unitl now***).

Source: https://spark.apache.org/docs/latest/sql-programming-guide.html

<h3> PySpark DataFrames

In [11]:
# There are many ways to create DataFrames in Pyspark. Lets see how it works:

#a) create dataframe based on rdd. Just for an example:

df = rdd_splited.toDF()

In [12]:
#b) create dataframe based on a collection:

c = [(1,2,3),(4,5,6),(7,8,9),(10,11,12),(13,14,15)]
df = spark.createDataFrame(c)
df.head(2)

[Row(_1=1, _2=2, _3=3), Row(_1=4, _2=5, _3=6)]

In [13]:
#c) create dataframe based on csv file (external):

df = spark.read.csv(file, header=True, sep=',', inferSchema=False)
df.head(2)

[Row(customerID='7590-VHVEG', gender='Female', SeniorCitizen='0', Partner='Yes', Dependents='No', tenure='1', PhoneService='No', MultipleLines='No phone service', InternetService='DSL', OnlineSecurity='No', OnlineBackup='Yes', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='No', Contract='Month-to-month', PaperlessBilling='Yes', PaymentMethod='Electronic check', MonthlyCharges='29.85', TotalCharges='29.85', Churn='No'),
 Row(customerID='5575-GNVDE', gender='Male', SeniorCitizen='0', Partner='No', Dependents='No', tenure='34', PhoneService='Yes', MultipleLines='No', InternetService='DSL', OnlineSecurity='Yes', OnlineBackup='No', DeviceProtection='Yes', TechSupport='No', StreamingTV='No', StreamingMovies='No', Contract='One year', PaperlessBilling='No', PaymentMethod='Mailed check', MonthlyCharges='56.95', TotalCharges='1889.5', Churn='No')]

In [14]:
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: string (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: string (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: string (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [15]:
# Select some column

df.select("CustomerID").take(2)

[Row(CustomerID='7590-VHVEG'), Row(CustomerID='5575-GNVDE')]

In [16]:
# Distinct Values by column:

df.select("gender").distinct().take(2)

[Row(gender='Female'), Row(gender='Male')]

In [17]:
# Aggregate the dataframe & create measure:

df.groupby("gender").agg({"TotalCharges":"sum"}).show()

+------+-----------------+
|gender|sum(TotalCharges)|
+------+-----------------+
|Female|7952354.199999998|
|  Male|8103814.499999997|
+------+-----------------+



In [19]:
# Order the dataframe:

df.orderBy(df.TotalCharges.desc()).take(3)

[Row(customerID='9093-FPDLG', gender='Female', SeniorCitizen='0', Partner='No', Dependents='No', tenure='11', PhoneService='Yes', MultipleLines='No', InternetService='Fiber optic', OnlineSecurity='No', OnlineBackup='Yes', DeviceProtection='Yes', TechSupport='Yes', StreamingTV='No', StreamingMovies='Yes', Contract='Month-to-month', PaperlessBilling='Yes', PaymentMethod='Electronic check', MonthlyCharges='94.2', TotalCharges='999.9', Churn='No'),
 Row(customerID='4536-PLEQY', gender='Male', SeniorCitizen='0', Partner='Yes', Dependents='No', tenure='12', PhoneService='Yes', MultipleLines='No', InternetService='Fiber optic', OnlineSecurity='No', OnlineBackup='Yes', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='Yes', Contract='Month-to-month', PaperlessBilling='Yes', PaymentMethod='Credit card (automatic)', MonthlyCharges='85.05', TotalCharges='999.8', Churn='No'),
 Row(customerID='5899-MQZZL', gender='Female', SeniorCitizen='0', Partner='No', Dependents='No', 

In [20]:
#more functions & dataframe: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html