-sandbox
## Spark Crash Course


** What you will learn today:**
* How to Load/Extract the Data into Spark?
* How to Understand the Data using different Plots?
* Spark Structured and Unstructured APIs.
* Spark DataFrames : Transformations and Actions.
* Spark SQL intro.

Ref : https://spark.apache.org/docs/2.3.0/sql-programming-guide.html 

[DataFrame FAQs](https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html#dataframe-faqs)

## The Data

The dataset contains bike rental info from 2011 and 2012 in the Capital bikeshare system, plus additional relevant information such as weather.  

This dataset is from Fanaee-T and Gama (2013) and is hosted by the <a href="http://archive.ics.uci.edu/ml/datasets/Bike+Sharing+Dataset" target="_blank">UCI Machine Learning Repository</a>.

In [0]:
display(dbutils.fs.ls("dbfs:/FileStore/tables/"))

path,name,size
dbfs:/FileStore/tables/allstate/,allstate/,0
dbfs:/FileStore/tables/clustering/,clustering/,0
dbfs:/FileStore/tables/day.csv,day.csv,57569
dbfs:/FileStore/tables/hour.csv,hour.csv,1156736
dbfs:/FileStore/tables/orangeTelecom/,orangeTelecom/,0
dbfs:/FileStore/tables/sample_linear_regression_data.txt,sample_linear_regression_data.txt,119069
dbfs:/FileStore/tables/stopwords.txt,stopwords.txt,2557
dbfs:/FileStore/tables/testing_bot_data.csv,testing_bot_data.csv,980391
dbfs:/FileStore/tables/training_bot_data.csv,training_bot_data.csv,5745947
dbfs:/FileStore/tables/tweet_data.csv,tweet_data.csv,227655107


### Spark APIs 

1. Create a Spark Context : Its required for executing Operations (Transform + Actions)
2. Create Spark Session : Driver Process to control Spark APIs
Link - https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#starting-point-sparksession

1. Low Level Unstructured APIs
    * RDDS
2. Higher Level Structured APIs
    * DataFrame : Table of Data with Rows and Columns
    * SQL 
    * DataSets : TypeSafe 
    
Note - No performance difference between writing DataFrame Transformations or SQL queries, 
They both “compile” to the same underlying plan that we specify in DataFrame code

In [0]:
from pyspark.sql import SparkSession

spark

# spark = SparkSession \
#     .builder \
#     .appName("Python Spark SQL basic example") \
#     .config("spark.some.config.option", "some-value") \
#     .master("Resourse_manager") \ # (local/yarn)
#     .getOrCreate()

### DataFrame

Why DataFrame?

In Apache Spark , DataFrame is a distributed collection of Rows under Named Columns. (like Tables in SQL and Excel Sheet)
1. Immutable : we can Create DataFrames/RDDs but cannot change it.
     * DF1 --> Transformer --> DF2
     * Lineage can be tracked easily 
2. Lazy Evalution : Task will not be executed until an action is performed.
3. Distributed : distribued across the Cluster (partitions)
4. Resilient : replication of Partitions 

NOTE : 

* For implicit conversions like converting RDDs to DataFrames 
```import spark.implicits._```

* You can find the similar functionality/ transformations as in Pandas DataFrame.

BUT major differences are: 
1. Running tasks parallel on different nodes
2. Lazy evaluation 
3. DFs are immutable in Nature 
4. Pandas API support more (statistical) operations than PySpark DataFrame.

Spark ML uses DataFrame from Spark SQL as an ML dataset.

Dataframe of data types : 
```DataFrame [col_TEXT, 
           col_NUMERICAL, 
           col_ARRAY, 
           col_MAP, 
           col_FEATURE_VECTOR, 
           col_TRUE_LABELS, 
           col_PREDICTIONS
          ]```

#### You can also Load Data from
* Different DataFormats 
* * JSON, CSV, Parquet, ORC, Avro, LibSVM, Image
* Different DataStores
* * mySQL , Hbase, Hive, cassandra, MongoDB, Kafka, ElasticSearch

https://spark.apache.org/docs/latest/sql-data-sources.html

Example - 
* hdfs://FileStore/tables/
* s3://FileStore/tables/hour.csv
* dbfs://FileStore/tables/hour.csv

#### Create Dataframe from a file 
 
```
// from json file
spark.read.json(path_file.json)
// from text file 
spark.read.textFile("path_file.txt")
// from CSV file 
spark.read.csv("path_file.csv")    
    ```

In [0]:
fileName = "dbfs:/FileStore/tables/hour.csv"

initialDF = (spark.read          # Our DataFrameReader
  .option("header", "true")      # Let Spark know we have a header
  .option("inferSchema", "true") # Infering the schema (it is a small dataset)
  .csv(fileName)                 # Location of our data
  .cache()                       # Mark the DataFrame as cached.
)

initialDF.count()                # Materialize the cache

initialDF.printSchema()

In [0]:
initialDF.explain()

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def weathersit_values(r):
    if r == 1 : return "Clear"
    elif r == 2 : return "Mist"
    elif r == 3 : return "Light Rain-Snow"
    else: return "Heavy Rain"

def season_values(r):
    if r == 1 : return "spring"
    elif r == 2 : return "summer"
    elif r == 3 : return "fall"
    else: return "winter"
    
    
weathersitTransorm = udf(weathersit_values, StringType())
seasonTransorm = udf(season_values, StringType())

newDF = initialDF.\
withColumn("weathersit",weathersitTransorm(initialDF.weathersit)).\
withColumn("season",seasonTransorm(initialDF.season))

### Display Data in different Styles

In [0]:
initialDF.printSchema()

In [0]:
newDF.printSchema()

In [0]:
# Display first Row
initialDF.first()

In [0]:
# Display First 5 Rows
initialDF.head(5)

In [0]:
# Results in Table format
initialDF.show(2,truncate= True)

#### Range

In [0]:
myRangeDF = spark.range(1000)
myRangeDF.show()
myRangeDF.show(4,truncate= True)

### Different Ways to Access Columns

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType

#import org.apache.spark.sql.functions.{expr, col, column}

newDF.select(
  expr("season"),
  col("season"),
  column("season"))\
  .show(3)


### How many Rows and Columns

In [0]:
# How many rows ?
newDF.count()

In [0]:
#  How many columns ?
len(newDF.columns)

### Select Columns

In [0]:
# How to Select column(s)  
# first 5 rows 
newDF.select("atemp","temp","windspeed").show(5)

### Distinct numbers

In [0]:
# Distinct numbers
newDF.select("season").distinct().show()

# Count of Distinct Numbers
newDF.select("windspeed").distinct().count()

In [0]:
# check df empty
newDF.rdd.isEmpty()

In [0]:
# convert a RDD to DF 
rdd_df = newDF.rdd

# convert DF back to RDD
rdd_df.toDF()

### Filter Rows

In [0]:
springFilerDF = newDF.filter(newDF.season != "spring")
springFilerDF.show()

In [0]:
newDF.filter(newDF.season=="summer") 
newDF.filter(newDF.season.contains("mmer")).show()
newDF.filter(newDF.season.like("%mmer")).show()

### Type CASTing 

Syntax - dataFrame["columnName"].cast(DataType())

In [0]:
newDF.schema
# newDF.printSchema()

In [0]:
newDF.select("cnt").schema

In [0]:
from pyspark.sql.types import DoubleType

totalCnt = col("cnt")
newDF.select(totalCnt.cast(DoubleType()))

### Group By

In [0]:
newDF.groupby('season').count().show()

In [0]:
count_seasons = newDF.groupby('season').count()
count_seasons.explain()

### Order By

In [0]:
newDF.orderBy(newDF.cnt.desc()).show(5)
newDF.orderBy(newDF.casual.desc(),newDF.cnt ).printSchema()

### Add New Column or Replace

In [0]:
# The withColumn operation will take 2 parameters.
###  Column name which we want add /replace.
###  Expression on column.

* **hum**: Normalized humidity. The values are divided to 100 (max)
* **windspeed**: Normalized wind speed. The values are divided to 67 (max)

In [0]:
hum_new_DF = newDF.withColumn("hum_new",newDF.hum * 100)
hum_new_DF.printSchema()
hum_new_DF.show()

In [0]:
windspeed_new_DF = newDF.withColumn("windspeed", newDF.windspeed * 67)
windspeed_new_DF.printSchema()
windspeed_new_DF.select("windspeed").show()

In [0]:
# Spark SQL functions lit() and typedLit() are used to add a new column by assigning a literal or constant value to Spark DataFrame. Both functions return Column as return type.

# Both of these are available in Spark by importing org.apache.spark.sql.functions

from pyspark.sql.functions import lit

## New Column - Replace a Column with Constants
newDF.withColumn("cnt", lit(10)).show()

## Add Integer to a column
newDF.select('*', (newDF.cnt + 10000).alias('newCnt')).show()

In [0]:
newDF.select(col("cnt").alias("totalCnt")).show()
newDF.withColumnRenamed("cnt", "totalCnt").show()

### Statistical functions

Summary Stats 
* Mean, 
* StandardDeviance, 
* Min ,
* Max , 
* Count

For more [Statistical-and-mathematical-functions](https://databricks.com/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html)


NOTE : 
1. Summary statistics (Mean, Standard Deviance, Min ,Max , Count) of Categorical columns
2. DataFrame with Categorical Columns (StringType) on Describe : 
    -  output : mean, stddev are NULL
    -   min & max values are calculated based on ASCII value of categories

In [0]:
# Summary statistics (Mean, StandardDeviance, Min ,Max , Count) of Numerical columns
# initialDF.describe().show(5)
initialDF.select("atemp","temp","windspeed").describe().show()

### Pair wise frequency of categorical columns?

1. First column of each row will be the Distinct values of Season and Column names will be distinct values of Weather Situation
2. Note : Pair with no occurrences will have zero count in contingency table.

In [0]:
# Calculate pair wise frequency of categorical columns?
newDF.crosstab('weathersit','season').show()

In [0]:
newDF.stat.freqItems(["season"],0.4).show(truncate= False)
# df.stat.approxQuantile(...)
# df.stat.bloomFilter(...)
# df.stat.countMinSketch()

### SQL Queries 

1. Register the DataFrame as a Table 
2. Use spark session.sql function 
3. Returns a new DataFrame

In [0]:
newDF.createOrReplaceTempView('Bike_Prediction_Table')
# initialDF.createOrReplaceTempView('BIKE_PRedictions_OLD')

In [0]:
sqlContext

In [0]:
sqlContext.sql("select season, cnt from Bike_Prediction_Table ").show()
# newDF.select("season","cnt")

In [0]:
sqlContext.sql("select * from Bike_Prediction_Table where season='summer' ").show(5)

In [0]:
sqlContext.sql("select * from Bike_Prediction_Table where season='summer' and windspeed > 0.4").show()

In [0]:
# maximum booking in each season group in the newDF .
max_season_df = sqlContext.sql('select season, max(cnt) from bike_prediction_table group by season')

In [0]:
max_season_df.select("max(cnt)").show()

# DataPreProcessing

## Preprocessing the data

So what do we need to do to get our data ready for Machine Learning?

**Recall our goal**: We want to learn to predict the count of bike rentals (the `cnt` column).  We refer to the count as our target "label".

**Features**: What can we use as features to predict the `cnt` label?  

All the columns except `cnt`, and a few exceptions:
* `casual` & `registered`
  * The `cnt` column we want to predict equals the sum of the `casual` + `registered` columns.  We will remove the `casual` and `registered` columns from the data to make sure we do not use them to predict `cnt`.  (**Warning: This is a danger in careless Machine Learning.  Make sure you do not "cheat" by using information you will not have when making predictions**)
* `season` and the date column `dteday`: We could keep them, but they are well-represented by the other date-related columns like `yr`, `mnth`, and `weekday`.
* `holiday` and `weekday`: These features are highly correlated with the `workingday` column.
* row index column `instant`: This is a useless column to us.

## Data Cleaning

### Handling Missing Data

1. Filter Out 
2. Fill In

In [0]:
%sh 
cat > movies.csv <<EOF 
name,rating,studio,date 
Avengers Endgame, 5, marvel,1260759144
Batman Vs Superman,4 ,DC,835355664
The Joker ,, DC,835355681
Frozen,4,Disney,835355604
Hitman,,,
EOF 


In [0]:
%sh ls -alh  /databricks/driver/movies.csv


In [0]:
path_to_file = "file:///databricks/driver/movies.csv"

moviesDF = (spark.read          # Our DataFrameReader
  .option("header", "true")      # Let Spark know we have a header
  .option("inferSchema", "true") # Infering the schema (it is a small dataset)
  .csv(path_to_file)                 # Location of our data
  .cache()                       # Mark the DataFrame as cached.
)


In [0]:
moviesDF.printSchema()

In [0]:
moviesDF.show()

#### 1. Filter OUT

[DROP](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropna)

In [0]:
# DROP Any Row with Any NULL Valu
moviesDF.na.drop().show()

In [0]:
moviesDF.na.drop("all").show()
moviesDF.na.drop("any").show()

In [0]:
## APPLY to Specific Column
moviesDF.na.drop("all", subset = ['studio','rating']).show()

#### 2. Fill In

In [0]:
## TYPE Inference of INT in DataFrames

moviesDF.na.fill("1").show()
moviesDF.na.fill(835355604).show()
moviesDF.na.fill("OTHER STUDIO").show()
moviesDF.na.fill({'rating':0.0})

In [0]:
## NOTE : FOR PRODUCTION always use the names
moviesDF.select("rating").describe().show()
moviesDF.select(avg(moviesDF['rating'] )).show()
moviesDF.fillna(4.3, subset=['rating']).show()
moviesDF.na.fill({'rating':0.0})

### Remove Trivial Data

Let's drop the columns `instant`, `dteday`, `season`, `casual`, `holiday`, `weekday`, and `registered` from our DataFrame and then review our schema:

In [0]:
preprocessedDF = initialDF.drop("instant", "dteday", "season", "casual", "registered", "holiday", "weekday")

preprocessedDF.printSchema()

### dropDuplicates Columns

In [0]:
newDF.select('Season','windspeed','atemp').count()

In [0]:
newDF.select('Season','windspeed','atemp').dropDuplicates().count()


In [0]:
newDF.select('Season','windspeed','atemp').dropDuplicates().show()

In [0]:
newDF.select("Season").distinct().show()