# pySpark Commands Reference
https://spark.apache.org/docs/2.4.4/api/python/index.html


## Connect to Spark Cluster
https://github.com/minrk/findspark

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pyspark-df-overview").getOrCreate()

In [None]:
spark.version

### Create Spark DataFrame
Dataset from: https://www.kaggle.com/uciml/adult-census-income/home

In [None]:
df = spark.read.csv("/dataset/pyspark-df-overview/census_income.csv.gz", header=True)
df.printSchema()

### Define a schema

In [None]:
import pyspark.sql.types as t

census_schema = t.StructType([
      t.StructField('age', t.IntegerType(), True)
    , t.StructField('workclass', t.StringType(), True)
    , t.StructField('fnlwgt', t.IntegerType(), True)
    , t.StructField('education', t.StringType(), True)
    , t.StructField('education-num', t.IntegerType(), True)
    , t.StructField('marital-status', t.StringType(), True)
    , t.StructField('occupation', t.StringType(), True)
    , t.StructField('relationship', t.StringType(), True)
    , t.StructField('race', t.StringType(), True)
    , t.StructField('sex', t.StringType(), True)
    , t.StructField('capital-gain', t.DoubleType(), True)
    , t.StructField('capital-loss', t.DoubleType(), True)
    , t.StructField('hours-per-week', t.IntegerType(), True)
    , t.StructField('native-country', t.StringType(), True)
    , t.StructField('label', t.StringType(), True)
])

In [None]:
# Support for compressed (gziped) payload
df = spark.read.csv("/dataset/pyspark-df-overview/census_income.csv.gz", header=True, schema=census_schema)
df.printSchema()

In [None]:
df.count()

### Drop unused column

In [None]:
df = df.drop('fnlwgt')
df.printSchema()

### Few operations

In [None]:
from pyspark.sql.functions import count, avg, desc

df.groupBy(['education']). \
agg(
    count('*').alias('qty'), 
    avg('age').alias('avg_age')
).orderBy(desc('qty')). \
show()

### Using SQL
Same operation with SQL syntax

In [None]:
df.createOrReplaceTempView("census")
s = spark.sql("""
SELECT 
    education, 
    COUNT(*) AS qty, 
    AVG(age) AS avg_age
FROM census
GROUP BY education
""")
s.show()

In [None]:
# a transformation can be exposed as function
def my_query(field):
    return df.groupBy([field]). \
    agg(
        count('*').alias('qty'), 
        avg('age').alias('avg_age')
    ).orderBy(desc('qty'))
    

    
print(my_query('workclass').show())

In [None]:
df.select('age', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week').describe().show()

In [None]:
df.select('workclass', 'education', 'marital-status').describe().show()

In [None]:
df.freqItems(['marital-status']).show(truncate=False)

In [None]:
df.crosstab('age', 'label').sort("age_label").show()


In [None]:
df.groupby('native-country').agg({'native-country': 'count'}).sort('count(native-country)').show()

### Check if there is missing data

In [None]:
from pyspark.sql.functions import isnan, when, count, col

# All columns
# cols = df.columns
# Selected columns
cols = ['workclass', 'education-num', 'occupation', 'hours-per-week', 'native-country']

# https://stackoverflow.com/a/44631639/570393
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in cols]).show()

### Remove rows with missing data

In [None]:
# Total rows
print('total rows: %s' % df.count())

# After droping NA records
print('only complete rows: %s' % df.dropna().count())

### Fill rows that contains missing data

In [None]:
def show_df(df, field='occupation'):
    df.groupBy(field).count().show()

In [None]:
show_df(df)

In [None]:
# Fill with a fixed value
new_df = df.fillna({'occupation': 'Other-service'})

# Count 
show_df(new_df)

### Better way

Calc the `mean()` value of a column and use it on missing values.
Also use a static string for categorical data 

In [None]:
from pyspark.sql.functions import mean
df.groupBy().agg(mean('hours-per-week').alias('hours-per-week')).show()

In [None]:
from pyspark.sql.functions import mean
import pandas as pd

data_to_fill = \
    df.groupBy().agg(mean('hours-per-week').alias('hours-per-week')).toPandas().to_dict('records')[0]

# Simple Python Dict Update
data_to_fill.update({'occupation': 'Other-service'})

data_to_fill

In [None]:
df.fillna(data_to_fill).select('hours-per-week', 'occupation').show(50)

### Creating charts with pandas & matplotlib
https://pandas.pydata.org/pandas-docs/stable/api.html#api-dataframe-plotting

**Important:** possible only when data become small enough to driver program

In [None]:
# This is distributed
df_spark = df.groupBy('workclass').agg(count('*').alias('counts')).orderBy('counts')
# df_spark.show()

# This is running on driver
df_wk = df_spark.toPandas()

In [None]:
# Check Pandas DF content
df_wk

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

df_wk.plot.bar(x='workclass', y='counts', figsize=(20,6));

### Stop Drive Program
Release resources from Spark Cluster

In [None]:
spark.stop()

# Continue Learning

* [Kaggle Learn](https://www.kaggle.com/learn/overview)
* [PySpark Cookbook](https://www.safaribooksonline.com/library/view/pyspark-cookbook/9781788835367/)

## Other references

* [PySpark Tutorial for Beginners: Machine Learning Example](https://www.guru99.com/pyspark-tutorial.html)