# Data Types and Aggregations

In this lesson we will work with different datatypes of data and we will perform some aggregations on data.

## Summary
- <a href='#1'>1. Context and Motivation</a>
- <a href='#2'>2. Working with different Data Types</a>
    - <a href='#2.1'>2.1. Booleans</a>
    - <a href='#2.2'>2.2. Numbers</a>
    - <a href='#2.3'>2.3. Strings</a>
    - <a href='#2.4'>2.4. Dates and timestamps</a>
    - <a href='#2.5'>2.5. Null values</a>
    - <a href='#2.6'>2.6. Complex Types </a>
    - <a href='#2.7'>2.7. User Defined functions</a>
- <a href='#4'>4.  Exercises</a>
    - <a href='#4.1'>4.1. EDA</a>
    - <a href='#4.2'>4.2. Clustering</a>
    - <a href='#4.3'>4.3. Evaluation</a>
- <a href='#5'>5.  References</a>

# <a id='1'>1. Context and Motivation</a>

When we have a dataset with multiple data types, we need work with the column types specified in the dataset to do that we need to know how to work with datatypes in spark.

Aggregation is an act of collecting something together. We need to perform aggregations to transform one or more columns, grouping data and view the data the way we want.

# <a id='2'>2. Working with different Data Types</a>

In this chapter we will continue analyse and transform a **retail dataset**. 

The file must be on hdfs however if not you have to put it there using these instructions:
* Open a new terminal. 
* Run `hdfs dfs -put retail_data_2010-12-01.csv`.

In [None]:
#Read the dataset 
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema","true")\
.load("retail_data_2010-12-01.csv")
df.printSchema()

In [None]:
df.createOrReplaceTempView("dfTable") # create a new view for the data is like a view in sql 

### Lit function

`pyspark.sql.functions.lit(col)`
* Creates a Column of literal value. 
* See https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [None]:
from pyspark.sql.functions import lit
df.select(lit(5), lit("five"), lit(5.0))

## <a id='2.1'>2.1. Booleans</a>

Booleans are the foundation of all **filtering**.  
Boolean statements consist of four elements: **and, or, true, and false**.   
We use these simple structures to build logical statements that evaluate to either **true or false**. 

In [None]:
# in Python
from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(5, True) # What happens when this parameter is True or False? 

In [None]:
df.where("InvoiceNo <> 536365").show(5, True) # Same way to express difference

In [None]:
df.where("InvoiceNo <> 536365").explain() # Same physical plan will return same results

In [None]:
df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description").explain()# Same physical plan will return same results

Function **instr(str, substr)** 
* Returns the (1-based) index of the first occurrence of substr in str. 
* See https://spark.apache.org/docs/2.3.0/api/sql/index.html#instr

In [None]:
from pyspark.sql.functions import instr
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show() 

In [None]:
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).explain()

In [None]:
DOTCodeFilter = col("StockCode") == "DOT" # Filter if StockCode contains "DOT"
priceFilter = col("UnitPrice") > 600 # Filter if UnitPrice has more than 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1 # Filter if Description has POSTAGE

In [None]:
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive")\
.select("unitPrice", "isExpensive").show(5) # Creates a new column is expensive based on the filters

In [None]:
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive")\
.select("unitPrice", "isExpensive").explain()

In [None]:
DOTCodeFilter = col("StockCode") == "DOT" # Filter if StockCode contains "DOT"

df.withColumn("isExpensive", DOTCodeFilter)\
.where("isExpensive")\
.select("unitPrice", "isExpensive").show(5) # Creates a new column is expensive based on the filters

In [None]:
expr("NOT UnitPrice <= 250") # Check the expression before apply it

In [None]:
from pyspark.sql.functions import expr
df.withColumn("isExpensive", expr("NOT UnitPrice <= 500"))\
.where("isExpensive")\
.select("Description", "UnitPrice").show(5)

## <a id='2.2'>2.2. Numbers</a>

The second most common task you will do after filtering things is counting things. For the most part, we simply need to express our computation, and that should be valid assuming that we’re working with numerical data types.

Function **pow(expr1, expr2)** 
* Raises expr1 to the power of expr2. 
* See https://spark.apache.org/docs/2.3.0/api/sql/#pow 

In [None]:
from pyspark.sql.functions import pow

fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(5)

In [None]:
df.selectExpr("CustomerId","(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2) # Using sql

In [None]:
df.selectExpr("CustomerId","(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").explain()
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).explain()

Function **round(expr, d)** 
* Returns expr rounded to d decimal places using HALF_UP rounding mode. 
* https://spark.apache.org/docs/2.3.0/api/sql/#round

Function **bround(expr, d)** 
* Returns expr rounded to d decimal places using HALF_EVEN rounding mode.. 
* https://spark.apache.org/docs/2.3.0/api/sql/#bround

In [None]:
from pyspark.sql.functions import lit, round, bround

In [None]:
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)


Function **corr(col1, col2, method=None)**   
Indicates the correlation between two columns. The output is between -1 and 1.
* -1 indicates lower correlation
* 1 indicates higher correlation
https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html
https://www.spss-tutorials.com/pearson-correlation-coefficient/

In [None]:
from pyspark.sql.functions import corr
print(df.stat.corr("Quantity", "UnitPrice")) # 
df.select(corr("Quantity", "UnitPrice")).show() # Check if cheaper things are brought together. What is the answer?

In [None]:
from pyspark.sql.functions import count, mean, stddev_pop, min, max  # same as describe function one by one

Function **approxQuantile(col, probabilities, relativeError)** 
* Calculates the approximate quantiles of numerical columns of a DataFrame. 
* https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html

In [None]:
colName = "UnitPrice"
quantileProbs = [0.5]
relError = 0.05
df.stat.approxQuantile("UnitPrice", quantileProbs, relError)  # Dataframe method not needed to be imported

Function **freqItems(cols, support=None)** 
* Finding frequent items for columns. 
* https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html

In [None]:
df.stat.freqItems(["StockCode", "Quantity"],0.1).show(100000000,False)

Function **monotonically_increasing_id** 
* Returns monotonically increasing 64-bit integers. 
* https://spark.apache.org/docs/2.3.0/api/sql/#monotonically_increasing_id

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
df.select(monotonically_increasing_id()).show(2) # create indices for the dataset

## <a id='2.3'>2.3. Strings</a>

Function **initcap(str)** 
* Returns str with the first letter of each word in uppercase.
* https://spark.apache.org/docs/2.3.0/api/sql/#initcap

In [None]:
from pyspark.sql.functions import initcap
df.select(initcap(col("Description"))).show(5)

In [None]:
from pyspark.sql.functions import lower, upper

In [None]:
df.select(col("Description"),
lower(col("Description")),
upper(lower(col("Description")))).show(2)

* **ltrim** -> https://spark.apache.org/docs/2.3.0/api/sql/#ltrim  
* **rtrim** -> https://spark.apache.org/docs/2.3.0/api/sql/#rtrim  
* **lpad** -> https://spark.apache.org/docs/2.3.0/api/sql/#lpad  
* **trim** -> https://spark.apache.org/docs/2.3.0/api/sql/#trim  

In [None]:
from pyspark.sql.functions import ltrim, rtrim, rpad, lpad, trim

In [None]:
df.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 3, " ").alias("lp"),
rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

### **Regular Expressions**

Probably one of the most frequently performed tasks is searching for the existence of one string
in another or replacing all mentions of a string with another value.   
This is done using `Regular Expressions`

Function **regexp_replace(str, regexp, rep)** 
* Replaces all substrings of str that match regexp with rep.
* See https://spark.apache.org/docs/2.3.0/api/sql/#regexp_replace

In [None]:
from pyspark.sql.functions import regexp_replace
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
df.select(regexp_replace(col("Description"), regex_string, "COLOR").alias("color_clean"),col("Description")).show(5)

Function **translate(input, from, to)** 
* Translates the input string by replacing the characters present in the from string with the corresponding characters in the to string. 
* See https://spark.apache.org/docs/2.3.0/api/sql/#translate

In [None]:
from pyspark.sql.functions import translate
df.select(translate(col("Description"), "LEET", "1337"),col("Description"))\
.show()

In [None]:
containsBlack = instr(col("Description"), "BLACK") >= 1
#containsWhite = instr(col("Description"), "WHITE") >= 1
df.withColumn("hasSimpleColor", containsBlack | containsWhite)\
.where("hasSimpleColor")\
.select("Description").show(5, False)

Function **locate(substr, str[, pos])** 
* Returns the position of the first occurrence of substr in str after position pos.
* See https://spark.apache.org/docs/2.3.0/api/sql/#locate

In [None]:
from pyspark.sql.functions import expr, locate

In [None]:
def color_locator(column, color_string):
    return locate(color_string.upper(), column).cast("boolean").alias("is_" + color_string)

In [None]:
simpleColors = ["black", "white", "red", "green", "blue"]

In [None]:
selectedColumns = [color_locator(df.Description, c) for c in simpleColors]

In [None]:
selectedColumns

In [None]:
selectedColumns.append(expr("*")) # has to a be Column type

In [None]:
selectedColumns

In [None]:
df.select(*selectedColumns).where(expr("is_white OR is_red"))\
.select("Description").show(20, False)

## <a id='2.4'>2.4. Working with Dates and Timestamps</a>

Dates and Timestamps has many challenges.  
It’s always necessary to keep track of timezones and ensure that formats are correct and valid.

Spark has **dates**, which focus exclusively on calendar dates, and **timestamps**, which include both date
and time information.

Spark’s TimestampType class supports only second-level precision, which means that if
we are going to be working with milliseconds or microseconds, we will need to work around this
problem by potentially operating on them as longs.

Function **current_date()** 
* Returns the current date at the start of query evaluation.
* See https://spark.apache.org/docs/2.3.0/api/sql/#current_date

Function **current_timestamp()** 
* Returns the current timestamp at the start of query evaluation.
* See https://spark.apache.org/docs/2.3.0/api/sql/#current_timestamp

In [None]:
from pyspark.sql.functions import current_date, current_timestamp

In [None]:
# in Python
dateDF = spark.range(10)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp())# creates a Dataframe with ID, Current Date and current timestamp with 10 equal entries
dateDF.printSchema()

In [None]:
dateDF.show(5,False)

Function **date_add(start_date, num_days)** 
* Returns the date that is num_days after start_date.
* See https://spark.apache.org/docs/2.4.0/api/sql/#date_add

Function **date_sub(start_date, num_days)** 
* Returns the date that is num_days before start_date.
* See https://spark.apache.org/docs/2.4.0/api/sql/#date_sub



In [None]:
from pyspark.sql.functions import date_add, date_sub

In [None]:
dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1) # Add and subtract 5 days

Function **datediff(endDate, startDate)** 
* Returns the number of days from startDate to endDate.
* See https://spark.apache.org/docs/2.3.0/api/sql/#datediff
    
Function **months_between(timestamp1, timestamp2)** 
* Returns number of months between timestamp1 and timestamp2.
* See https://spark.apache.org/docs/2.3.0/api/sql/#months_between
  
Function **to_date(date_str[, fmt])** 
* Parses the date_str expression with the fmt expression to a date. Returns null with invalid input. By default, it follows casting rules to a date if the fmt is omitted.
* See https://spark.apache.org/docs/2.3.0/api/sql/#to_date

In [None]:
from pyspark.sql.functions import datediff, months_between, to_date

In [None]:
dateDF.withColumn("week_ago", date_sub(col("today"), 7)).select(datediff(col("week_ago"), col("today"))).show(1)

In [None]:
dateDF.select(
to_date(lit("2018-12-02")).alias("end"),
to_date(lit("2019-05-22")).alias("start"))\
.select(months_between(col("start"), col("end"))).show(1)

In [None]:
dateDF.select(to_date(lit("2018-20-12")),to_date(lit("2018-12-11"))).show(1) # spark will not return an error instead create a null value

In [None]:
dateFormat = "yyyy-dd-MM"
cleanDateDF = spark.range(1).select(
to_date(lit("2018-12-11"), dateFormat).alias("date"),
to_date(lit("2018-20-12"), dateFormat).alias("date2"))

In [None]:
cleanDateDF.show()

In [None]:
# convert into timestamp 
from pyspark.sql.functions import to_timestamp
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()

In [None]:
cleanDateDF.filter(col("date2") > lit("2018-12-12")).show() # compare dates

## <a id='2.5'>2.5. Nulls Values</a>

As best practice we should always Sometimes we get null values to let know spark how to handle it internally

In [None]:
from pyspark.sql.functions import coalesce
df.select(coalesce(col("Description"), col("CustomerId"))).show(20,False)

In [None]:
#Removes rows that contain nulls. 
#The default is to drop any row in which any value is null:
df.na.drop() 

In [None]:
#Drops a row if any of the values are null.
df.na.drop("any")

In [None]:
# Drops the row only if all values are null or NaN for that row
df.na.drop("all") 

In [None]:
# Delete the rows which has null values in StockCode and InvoceNo simultaneously
df.na.drop("all", subset=["StockCode", "InvoiceNo"]) 

### Fill   
Using the fill function, you can fill one or more columns with a set of values.    
This can be done by specifying a map—that is a particular value and a set of columns.

In [None]:
df.na.fill("All Null values become this string")

In [None]:
df.na.fill("all", subset=["StockCode", "InvoiceNo"])

In [None]:
fill_cols_vals = {"StockCode": 5, "Description" : "No Value"}
df.na.fill(fill_cols_vals).show(5)

### Replace
Using the fill function, you can fill one or more columns with a set of values.    
This can be done by specifying a map—that is a particular value and a set of columns.

In [None]:
df.na.replace([""], ["UNKNOWN"], "Description").show()

## <a id='2.6'>2.6. Complex Types</a>

### Split

In [None]:
from pyspark.sql.functions import split

In [None]:
df.select(split(col("Description"), " ")).show(2)

In [None]:
df.select(split(col("Description"), " ").alias("array_col"))\
.selectExpr("array_col[0]").show(2)

### Array Length

In [None]:
from pyspark.sql.functions import size

In [None]:
df.select(size(split(col("Description"), " "))).show(2) # shows 5 and 3

### Array_contains

In [None]:
from pyspark.sql.functions import array_contains

In [None]:
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)

### Explode

The explode function takes a column that consists of arrays and creates one row (with the rest of
the values duplicated) per value in the array. 

In [None]:
from pyspark.sql.functions import split, explode

In [None]:
df.withColumn("splitted", split(col("Description"), " "))\
.withColumn("exploded", explode(col("splitted")))\
.select("Description", "InvoiceNo", "exploded").show(2)

### Maps

In [None]:
from pyspark.sql.functions import create_map
df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(5,False)

In [None]:
df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
.selectExpr("complex_map['WHITE METAL LANTERN']").show(5)

In [None]:
df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
.selectExpr("explode(complex_map)").show(5,False)

## <a id='2.7'>2.7. User Defined functions</a>

Spark will serialize the function on the driver and transfer it over the network to all executor processes.
Spark starts a Python process on the worker, serializes all of the data to a format that Python can understand, executes the function row by row on that data in the Python process, and then finally returns the results of the row operations to the JVM and Spark.

In [None]:
udfExampleDF = spark.range(5).toDF("num")

In [None]:
def power3(double_value):
    return double_value ** 3

In [None]:
from pyspark.sql.functions import udf
power3udf = udf(power3)

In [None]:
from pyspark.sql.functions import col
udfExampleDF.select(power3udf(col("num"))).show(10)

In [None]:
from pyspark.sql.types import IntegerType, DoubleType
spark.udf.register("power3py", power3, IntegerType()) # register the function

In [None]:
# Use the function as SQL expression 

In [None]:
udfExampleDF.selectExpr("power3py(num)").show()

# <a id='4'>4. Exercises</a>

# Customer Churn

Customer churn, also known as customer attrition, customer turnover, or customer defection, is the loss of clients or customers.

Telephone service companies, Internet service providers, pay TV companies, insurance firms, and alarm monitoring services, often use customer churn analysis and customer churn rates as one of their key business metrics because the cost of retaining an existing customer is far less than acquiring a new one. Companies from these sectors often have customer service branches which attempt to win back defecting clients, because recovered long-term customers can be worth much more to a company than newly recruited clients.

Companies usually make a distinction between voluntary churn and involuntary churn. Voluntary churn occurs due to a decision by the customer to switch to another company or service provider, involuntary churn occurs due to circumstances such as a customer's relocation to a long-term care facility, death, or the relocation to a distant location. In most applications, involuntary reasons for churn are excluded from the analytical models. Analysts tend to concentrate on voluntary churn, because it typically occurs due to factors of the company-customer relationship which companies control, such as how billing interactions are handled or how after-sales help is provided.

Predictive analytics use churn prediction models that predict customer churn by assessing their propensity of risk to churn. Since these models generate a small prioritized list of potential defectors, they are effective at focusing customer retention marketing programs on the subset of the customer base who are most vulnerable to churn.

## Column Description   

| Column     | Type       | Description |
|--------  |---------  |: --------- |
| **customerID** | String | Customer ID |
| **gender** | String | Whether the customer is a male or a female |
| **SeniorCitizen** | Integer | Whether the customer is a senior citizen or not (1, 0) |
| **Partner** | String | Whether the customer has a partner or not (Yes, No) |
| **Dependents** | String | Whether the customer has dependents or not (Yes, No) |
| **tenure** | Integer | Number of months the customer has stayed with the company |
| **PhoneService** | String | Whether the customer has a phone service or not (Yes, No) |
| **MultipleLines** | String | Whether the customer has multiple lines or not (Yes, No, No phone service) |
| **InternetService** | String | Customer’s internet service provider (DSL, Fiber optic, No) |
| **OnlineSecurity** | String | Whether the customer has online security or not (Yes, No, No internet service) |
| **OnlineBackup** | String | Whether the customer has online backup or not (Yes, No, No internet service) |
| **DeviceProtection** | String | Whether the customer has device protection or not (Yes, No, No internet service) |
| **TechSupport** | String | Whether the customer has tech support or not (Yes, No, No internet service) |
| **StreamingTV** | String | Whether the customer has streaming movies or not (Yes, No, No internet service) |
| **StreamingMovies** | String | Whether the customer has a partner or not (Yes, No) |
| **Contract** | String | The contract term of the customer (Month-to-month, One year, Two year) |
| **PaperlessBilling** | String | Whether the customer has paperless billing or not (Yes, No) |
| **PaymentMethod** | String | The customer’s payment method (Electronic check, Mailed check, Bank transfer (automatic), Credit card (automatic)) |
| **MonthlyCharges** | Double | The amount charged to the customer monthly |
| **TotalCharges** | String | The total amount charged to the customer |
| **Churn** | String | Whether the customer churned or not (Yes or No) |

For exercises we will continue analyse the churn dataset in order to  create some **clusters** based on the information provided

Run the command command `hdfs dfs -put WA_Fn-UseC_-Telco-Customer-Churn.csv` to put the dataframe in hfds.

In [None]:
# Create the dataframe.
df = spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("WA_Fn-UseC_-Telco-Customer-Churn.csv")

In [None]:
df.printSchema()

## <a id='4.1'>4.1. EDA - Exploratory data analysis</a>

In [None]:
# Transform columns that is in strings witch don't have a description in integers

In [None]:
# Check check the relation between variables using sns pairplot
import seaborn as sns 
sns.set(style="ticks", color_codes=True)
df = df.toPandas()
pairplot = sns.pairplot(df, hue="Churn")

data_df.printSchema()

In [None]:
# Count the number of columns.

In [None]:
# Count the number of rows.

In [None]:
# Check the min, max and metrics using describe field

## <a id='4.2'>4.2. Clustering</a>

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
va = VectorAssembler()\
.setInputCols(["MonthlyCharges","tenure"])\
.setOutputCol("features")

churn = va.transform(spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("WA_Fn-UseC_-Telco-Customer-Churn.csv"))

churn.cache()

In [None]:
km = KMeans().setK(4)
print (km.explainParams())
kmModel = km.fit(churn)
# Make predictions
predictions = kmModel.transform(churn)

## <a id='4.3'>4.3. Evaluation</a>

In [None]:
summary = kmModel.summary
print (summary.clusterSizes) # number of points
kmModel.computeCost(churn)
centers = kmModel.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

In [None]:
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

In [None]:
transformed = kmModel.transform(churn).select("features", "prediction")

In [None]:
transformed.show()

In [None]:
df.show(5)

# <a id='5'>5. References</a>

https://spark.apache.org/docs/2.3.0/api/sql/

https://spark.apache.org/docs/2.2.0/api/python/

https://spark.apache.org/docs/latest/ml-clustering.html