<img tyle="float: right;"  src="http://minneanalytics.org/wp/wp-content/uploads/2018/04/BDT18_LP-02-02.jpg" \>

# PySPARK Simple Start with jupyter

## IBM open source pixiedust notebook spark kernel
- [IBM PixieDust](https://github.com/ibm-watson-data-lab/pixiedust)

In [None]:
import pixiedust

## Set spark progress monitor (not working yet with jupyter lab)

In [None]:
pixiedust.enableSparkJobProgressMonitor()

In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
csvPath = "file:///media/sf_mnlytics/data/fake_customers_100.csv.gz"

## Read csv file using PySpark API

In [None]:
df = spark.read.csv(csvPath,header=True,sep="\t",ignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True,inferSchema=False,multiLine=True)

#### Print spark dataframe schema

In [None]:
df.printSchema()

### feature transformation - problem: zipcode was read as integer and must be fixed then converted to string
- if null then "99999'
- if length >= 5 then use asis
- if length < 5 pad with zeroes on the left.
- drop current zipcode column
- rename computed_0 column to zipcode

In [None]:
transformation_0 = df.withColumn("computed_0",\
                                (when(col("zipcode").isNull(),lit(99999))\
                                .when(length(col("zipcode")) >= 5,col('zipcode'))\
                                .when(length(col("zipcode")) < 5, concat(expr("substring('00000',1,5-length(zipcode))"),col("zipcode")))\
                                .otherwise(col("zipcode"))).cast(StringType()))

transformation_1 = transformation_0\
                        .drop("zipcode")

transformation_2 = transformation_1\
                        .withColumnRenamed("computed_0","zipcode")

transformation_2\
    .select("zipcode")\
    .filter(col("zipcode").startswith("0"))\
    .show(n=5)

### use describe function on a columns subset based on their data type

In [None]:
transformation_2\
    .describe(*[c for c, t in transformation_2.dtypes if t in ["int","double","bigint","long"]])\
    .show()

### drop a column then sort on zipcode

In [None]:
transformation_3 = transformation_2\
    .drop("credit_card_full")\
    .sort(asc("zipcode"))

In [None]:
transformation_3\
    .show(n=10,truncate=20)

### create in memory temporary "SQL" table.

In [None]:
_ = transformation_3.createTempView("customers")
sqlContext.tableNames()

### query in memory table using spark "SQL"

In [None]:
spark.sql("select * from customers")\
    .show(n=10,truncate=10)

### drop the in memory temporary table

In [None]:
_ = sqlContext.dropTempTable("customers")
sqlContext.tableNames()

### persist dataframe to hadoop file system using parquet format

In [None]:
%%capture dummy
_ = transformation_3\
    .write\
    .mode("overwrite")\
    .parquet("/data/mnlytics.parquet")

### use command line hdfs utility to query hadoop file system 

In [None]:
!hdfs dfs -ls /data

### read parquet file data within a spark dataframe

In [None]:
df_parquet = spark.read.parquet("/data/mnlytics.parquet")
df_parquet.count()

#### python famous libraries

In [None]:
import numpy as np
import pandas as pd
from pandas.io import sql
import matplotlib.style
import matplotlib as mpl
import matplotlib.pyplot as plt

#### set matplotlib styles

In [None]:
matplotlib.style.available

In [None]:
_ = mpl.style.use('ggplot')

# Create a pandas dataframe from a spark dataframe

In [None]:
%%capture dummy
df_pandas = df_parquet.toPandas()

In [None]:
df_pandas.head()

-

### groupby then filter pandas dataframe finally plot the result
- groupby job and count
- filter job with count > 65 using lambda expression
- dropna
- plot using matplotlib 

In [None]:
df_pandas[["job"]]\
    .groupby(["job"])\
    .size()\
    .where(lambda count : count > 65)\
    .dropna()\
    .plot(kind="barh",figsize=(20,10),title="Top Jobs");

# Save dataframe to permanent hive table

In [None]:
spark.catalog.listDatabases()

In [None]:
spark.catalog.currentDatabase()

In [None]:
spark.catalog.clearCache()

In [None]:
df_parquet.write.mode('overwrite').saveAsTable("customer")

In [None]:
spark.catalog.listTables()

## Query hive table

In [None]:
spark.sql("select * from customer").show(5)

In [None]:
spark.sql("drop table customer").show()

In [None]:
spark.catalog.listTables()