In [13]:
%config Completer.use_jedi = False

# Spark In Conda Installation !!!

`Make sure this step is done correctly`  

`prerequisites`
- anaconda apps
- jupyter/lab
- conda environment variables in the system

`Steps`
1. download spark at https://spark.apache.org/downloads.html

    - make sure you pick correct version of Spark and compatible with following entities: Pyspark, Hadoop, Winutils & Hadoop.dll, Java, Python


2. add conda env. variables in system path

    - which point at C:\bla\cla\Anaconda3\Scripts


3. Setup conda Python interpreter exe directory in Path environment variables

    - which point at C:\bla\cla\Anaconda3


4. add SPARK_HOME environment variables

    - which point at C:\bla\Spark\spark-3.1.2-bin-hadoop3.2


5. add HADOOP_HOME environment variables which point at

    - C:\bla\Hadoop
    - NOTE : spark need some of hadoop properties like winutils.exe and hadoop.dll, download at https://github.com/steveloughran/winutils


6. add JAVA_HOME environment variables which point at

    - C:\Program Files\Java\jdk1.8.0_301
    - NOTE : USE JAVA JDK not JRE


7. Add to Path environment variables the \bin directory of (Hadoop,Spark,Java,Python)  
8. add PYTHONPATH environment varibales which point to spark pyhton py4j
9. add PYSPARK_DRIVER_PYTHON_OPTS and set the value to 'notebook'
10. add PYSPARK_DRIVER_PYTHON and set the value to jupyter
11. copy hadoop.dll to C:\Windows\System32

                            -- checkpoint --
NOTE : At this point, spark can run properly in CMD/spark shell

12. install pyspark using pip

    - pip install pyspark
    - NOTE : you may encounter some failure due to required other dependencies, install them to meet pyspark requirements dependencies

13. install findspark using pip
    - pip install findspark
    - call the mehtod in production code


14. DONE ! Spark can run at standalone mode

# Spark Journey


## import necessary dependencies

In [1]:
import findspark

findspark.init()
findspark.find()
import pyspark
import numpy as np
import pandas as pd
import json
import time
import psycopg2
#koalas module: pandas like interface to integrate with spark dataframe
import databricks.koalas as ks

#function for aggregation
from pyspark.sql import functions as F
from pyspark.sql.functions import sum,min,max,avg,count

#spark configuration 
from pyspark import SparkConf

#spark streaming dependencies
from pyspark.sql.types import StructField,StructType,StringType,IntegerType,DoubleType,DateType
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from pyspark.sql.streaming import DataStreamReader



## Data Sources

In [2]:
data = r"D:\Data Engineer\Project\portofolio dashboard\brazil chainstore\Sales Report.csv"

## Spark Configuration

In [3]:
conf = SparkConf().setAll([
    ("spark.driver.memory","4g"),
    ("spark.executor.memory","6g"),
    ("spark.executor.memoryOverhead","1g"),
    ("spark.master","local[*]"),
    ("spark.executor.cores",4),
    ("spark.executor.instances",4),
    ("spark.default.parallelism",16),
    ("spark.task.cpus",2)
])
#spark.conf.set("spark.executor.instances", 4)
#spark.conf.set("spark.executor.cores", 4)
#avaiable properties at : https://spark.apache.org/docs/latest/configuration.html

## Spark Initialization

In [4]:
spark = pyspark.sql.SparkSession.builder.config(conf=conf).appName("learnspark").getOrCreate()
#initialize session

In [5]:
spark
#spark.stop()
#spark sesion

## Spark Playground

### Read Data

In [6]:
sdf = spark.read.csv(data,header=True,sep=';',inferSchema=True)
#infer schema = read column values data type if none/false will be string default

### Data Inspection

In [7]:
sdf.show(5)
#sdf.take(5)
#sdf.collect()
#sdf.head()

+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+
|Company Code|Order Number|       Employee|          Product|Product Category|              Client|Client City|     Sale Date Time|Product Cost|Discount Amount| Amount| Total|Form of payment|
+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+
|       39000|          12|      Stacy Day| Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 04:10:00|        3.05|            0.0|  5.642| 20.02|          Money|
|       39000|          21|Olive Stevenson|   Special Diesel|            Fuel|Customer not info...|    No City|2017-03-31 04:13:00|        2.51|            0.0|125.045| 350.0|     Debit Card|
|       39000|          38|      Stacy D

In [9]:
#integrate to koalas (pandas like dataframe)
spdf = sdf.to_koalas()
spdf.head()

Unnamed: 0,Company Code,Order Number,Employee,Product,Product Category,Client,Client City,Sale Date Time,Product Cost,Discount Amount,Amount,Total,Form of payment
0,39000,12,Stacy Day,Special Gasoline,Fuel,Customer not informed,No City,2017-03-31 04:10:00,3.05,0.0,5.642,20.02,Money
1,39000,21,Olive Stevenson,Special Diesel,Fuel,Customer not informed,No City,2017-03-31 04:13:00,2.51,0.0,125.045,350.0,Debit Card
2,39000,38,Stacy Day,Special Diesel,Fuel,Customer not informed,No City,2017-03-31 04:25:00,2.51,0.0,35.699,99.92,Money
3,39000,39,Olive Stevenson,Lubricant 1108,Lubricant,Customer not informed,No City,2017-03-31 04:26:00,7.409,0.0,1.0,13.0,Money
4,39000,39,Olive Stevenson,Diesel Auto Clean,Fuel,Customer not informed,No City,2017-03-31 04:26:00,2.56,0.0,42.162,120.96,Money


In [11]:
type(sdf)

pyspark.sql.dataframe.DataFrame

In [11]:
type(spdf)

databricks.koalas.frame.DataFrame

In [12]:
sdf.printSchema()
#get info similiar to df.info

root
 |-- Company Code: integer (nullable = true)
 |-- Order Number: integer (nullable = true)
 |-- Employee: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Client: string (nullable = true)
 |-- Client City: string (nullable = true)
 |-- Sale Date Time: string (nullable = true)
 |-- Product Cost: double (nullable = true)
 |-- Discount Amount: double (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Total: double (nullable = true)
 |-- Form of payment: string (nullable = true)



In [12]:
sdf.dtypes

[('Company Code', 'int'),
 ('Order Number', 'int'),
 ('Employee', 'string'),
 ('Product', 'string'),
 ('Product Category', 'string'),
 ('Client', 'string'),
 ('Client City', 'string'),
 ('Sale Date Time', 'string'),
 ('Product Cost', 'double'),
 ('Discount Amount', 'double'),
 ('Amount', 'double'),
 ('Total', 'double'),
 ('Form of payment', 'string')]

In [13]:
#similar to df.describe
sdf.describe().to_koalas()

Unnamed: 0,summary,Company Code,Order Number,Employee,Product,Product Category,Client,Client City,Sale Date Time,Product Cost,Discount Amount,Amount,Total,Form of payment
0,count,26951165.0,26951165.0,26951165,26951165,26951165,26951165,26951165,26951165,26951165.0,26951165.0,26951165.0,26951165.0,26951165
1,mean,4451263.587937664,1385973.6962759125,,,,,,,3.738021302157949,0.0032230688357998,17.4196142442836,66.21832481241184,
2,stddev,4284039.359860178,607761.4532924617,,,,,,,2.700789823747017,0.6349943262928652,28.35413206226764,93.98091956167043,
3,min,39000.0,8.0,Aaron Wolfe,Accessories 10021,Accessories,Aaron Cruz,Abladwood,2017-03-31 04:10:00,-1.794,0.0,0.003,0.01,Check
4,max,14740000.0,2391978.0,Zachary Vaughn,Water 9993,Water,Zachary Underwood,Zumont,2020-04-02 23:58:00,409.0,803.4,3257.7990000000004,9999.0,Pre-paid check / Pre- date check


In [24]:
#show distinct values
sdf.select(['Product Category']).distinct().show()

+--------------------+
|    Product Category|
+--------------------+
|          The Bakery|
|               Water|
|             Pickets|
|        Extinguisher|
|           Perfumery|
|Cold Alcoholic Be...|
|           Flavoring|
|Dairy Products An...|
|              Energy|
| Derivatives Peanuts|
|           Batteries|
|Hot Alcoholic Bev...|
|Popsicle And Ice ...|
| Sweet Coconut James|
|      Salted Biscuit|
|             Cereals|
|          Cigarettes|
|              Condom|
|               Chips|
|Chewing Gum And C...|
+--------------------+
only showing top 20 rows



In [26]:
#count distinct
sdf.select('Product Category').distinct().count()

39

In [34]:
sdf.select('Client').distinct().orderBy('Client').show(truncate = False)

+----------------+
|Client          |
+----------------+
|Aaron Cruz      |
|Aaron Hampton   |
|Aaron Howell    |
|Aaron Nguyen    |
|Aaron Washington|
|Abel Haynes     |
|Abel Howard     |
|Abel Ingram     |
|Abel Jensen     |
|Abel Montgomery |
|Abel Snyder     |
|Abel Stevenson  |
|Abel Walton     |
|Abraham Austin  |
|Abraham Banks   |
|Abraham Chavez  |
|Abraham Dean    |
|Abraham Dixon   |
|Abraham Dunn    |
|Abraham Gilbert |
+----------------+
only showing top 20 rows



In [27]:
#count
sdf.select('Product').count()

26951165

In [83]:
#inspect null values
sdf.to_koalas().isnull().sum()

Company Code        0
Order Number        0
Employee            0
Product             0
Product Category    0
Client              0
Client City         0
Sale Date Time      0
Product Cost        0
Discount Amount     0
Amount              0
Total               0
Form of payment     0
dtype: int64

In [68]:
#null value inspection can be done with either of these ways

sdf.filter("Client IS NULL OR Employee IS NULL").show()
#sdf.filter(sdf.Product.isNull()).show()
#sdf.filter("'Client City' IS NULL").show()

+------------+------------+--------+-------+----------------+------+-----------+--------------+------------+---------------+------+-----+---------------+
|Company Code|Order Number|Employee|Product|Product Category|Client|Client City|Sale Date Time|Product Cost|Discount Amount|Amount|Total|Form of payment|
+------------+------------+--------+-------+----------------+------+-----------+--------------+------------+---------------+------+-----+---------------+
+------------+------------+--------+-------+----------------+------+-----------+--------------+------------+---------------+------+-----+---------------+



### indexing and slicing spark

In [28]:
# select clause
employee = sdf.select(['Employee','Product'])
#employee.show()

#perform simple transformation on select
sdf.select(sdf['Product'],(sdf['Product Cost'] + 10).alias('New Product Cost')).show(5)

+-----------------+----------------+
|          Product|New Product Cost|
+-----------------+----------------+
| Special Gasoline|           13.05|
|   Special Diesel|           12.51|
|   Special Diesel|           12.51|
|   Lubricant 1108|          17.409|
|Diesel Auto Clean|           12.56|
+-----------------+----------------+
only showing top 5 rows



In [50]:
# where clause 
sdf.select('Order Number','Total','Form of payment').filter(sdf['Form of payment'] == 'Money').show(5)

+------------+------+---------------+
|Order Number| Total|Form of payment|
+------------+------+---------------+
|          12| 20.02|          Money|
|          38| 99.92|          Money|
|          39|  13.0|          Money|
|          39|120.96|          Money|
|          40|  9.72|          Money|
+------------+------+---------------+
only showing top 5 rows



In [66]:
# aggregation, column renaming, sort, koalas convertation
sdf.groupBy('Form of payment').sum('Total').withColumnRenamed("sum(Total)", "Total").orderBy('sum(Total)').show() #to_koalas()

+--------------------+--------------------+
|     Form of payment|               Total|
+--------------------+--------------------+
|              Others|   97020.54999999999|
|               Check|  3174994.0000000047|
|Pre-paid check / ...|1.1391351160000129E7|
|Extended payment ...|1.0545461156999865E8|
|          Debit Card| 3.358544106939356E8|
|         Credit Card| 3.911465130299236E8|
|               Money| 9.375420970382955E8|
+--------------------+--------------------+



In [31]:
# aggregation, sorting, column renaming, converting to koalas
sdf.groupBy('Form of payment').agg({'Total':'sum'}).withColumnRenamed("sum(Total)", "Total").orderBy('Total',ascending=False).to_koalas()

Unnamed: 0,Form of payment,Total
0,Money,937542100.0
1,Credit Card,391146500.0
2,Debit Card,335854400.0
3,Extended payment plan/ In installments,105454600.0
4,Pre-paid check / Pre- date check,11391350.0
5,Check,3174994.0
6,Others,97020.55


In [32]:
#aggregation, column renaming, sorting, koalas converting
sdf.groupBy('Employee').agg(sum('Total').alias('Sales'),avg('Total').alias('Average Sales')).orderBy("Average Sales",ascending=False).to_koalas().head()

Unnamed: 0,Employee,Sales,Average Sales
0,Maurice Bell,1916144.98,151.617739
1,Patrick Lloyd,3288773.61,144.269767
2,Lucy Cox,2640205.08,144.045233
3,Ernest Figueroa,2494723.32,139.877955
4,Lula Ford,1260200.04,136.414813


In [30]:
#replace value with value
sdf.withColumn('Employee', \
               F.when(sdf['Employee'] == 'Stacy Day', 'Kevin Elfri') \
               .otherwise(sdf['Employee'])).show(5)

+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+
|Company Code|Order Number|       Employee|          Product|Product Category|              Client|Client City|     Sale Date Time|Product Cost|Discount Amount| Amount| Total|Form of payment|
+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+
|       39000|          12|    Kevin Elfri| Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 04:10:00|        3.05|            0.0|  5.642| 20.02|          Money|
|       39000|          21|Olive Stevenson|   Special Diesel|            Fuel|Customer not info...|    No City|2017-03-31 04:13:00|        2.51|            0.0|125.045| 350.0|     Debit Card|
|       39000|          38|    Kevin Elf

In [38]:
# replace known null format to np.NaN
sdf.withColumn('Client', F.when(sdf['Client'] == 'Customer not informed',np.NaN)).show(5)

+------------+------------+---------------+-----------------+----------------+------+-----------+-------------------+------------+---------------+-------+------+---------------+
|Company Code|Order Number|       Employee|          Product|Product Category|Client|Client City|     Sale Date Time|Product Cost|Discount Amount| Amount| Total|Form of payment|
+------------+------------+---------------+-----------------+----------------+------+-----------+-------------------+------------+---------------+-------+------+---------------+
|       39000|          12|      Stacy Day| Special Gasoline|            Fuel|   NaN|    No City|2017-03-31 04:10:00|        3.05|            0.0|  5.642| 20.02|          Money|
|       39000|          21|Olive Stevenson|   Special Diesel|            Fuel|   NaN|    No City|2017-03-31 04:13:00|        2.51|            0.0|125.045| 350.0|     Debit Card|
|       39000|          38|      Stacy Day|   Special Diesel|            Fuel|   NaN|    No City|2017-03-31 04

### Drop Null

In [33]:
sdf.na.drop().show(5)
#specify column name in the drop argument

+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+
|Company Code|Order Number|       Employee|          Product|Product Category|              Client|Client City|     Sale Date Time|Product Cost|Discount Amount| Amount| Total|Form of payment|
+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+
|       39000|          12|      Stacy Day| Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 04:10:00|        3.05|            0.0|  5.642| 20.02|          Money|
|       39000|          21|Olive Stevenson|   Special Diesel|            Fuel|Customer not info...|    No City|2017-03-31 04:13:00|        2.51|            0.0|125.045| 350.0|     Debit Card|
|       39000|          38|      Stacy D

### Filling Null Values

In [36]:
#filling null values

sdf.fillna('some value').show(3)
#specify column by subset argument

+------------+------------+---------------+----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+-----+---------------+
|Company Code|Order Number|       Employee|         Product|Product Category|              Client|Client City|     Sale Date Time|Product Cost|Discount Amount| Amount|Total|Form of payment|
+------------+------------+---------------+----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+-----+---------------+
|       39000|          12|      Stacy Day|Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 04:10:00|        3.05|            0.0|  5.642|20.02|          Money|
|       39000|          21|Olive Stevenson|  Special Diesel|            Fuel|Customer not info...|    No City|2017-03-31 04:13:00|        2.51|            0.0|125.045|350.0|     Debit Card|
|       39000|          38|      Stacy Day|  Speci

### Column Ops

In [37]:
#drop column
sdf.drop("Product").show(5)

+------------+------------+---------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+
|Company Code|Order Number|       Employee|Product Category|              Client|Client City|     Sale Date Time|Product Cost|Discount Amount| Amount| Total|Form of payment|
+------------+------------+---------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+
|       39000|          12|      Stacy Day|            Fuel|Customer not info...|    No City|2017-03-31 04:10:00|        3.05|            0.0|  5.642| 20.02|          Money|
|       39000|          21|Olive Stevenson|            Fuel|Customer not info...|    No City|2017-03-31 04:13:00|        2.51|            0.0|125.045| 350.0|     Debit Card|
|       39000|          38|      Stacy Day|            Fuel|Customer not info...|    No City|2017-03-31 04:25:00|        2.51|    

In [38]:
sdf.dropDuplicates().show(5)
#specify the column for drop if the specified column has duplicate in common

+------------+------------+------------------+----------------+----------------+--------------------+-----------+-------------------+------------------+---------------+------------------+-----+---------------+
|Company Code|Order Number|          Employee|         Product|Product Category|              Client|Client City|     Sale Date Time|      Product Cost|Discount Amount|            Amount|Total|Form of payment|
+------------+------------+------------------+----------------+----------------+--------------------+-----------+-------------------+------------------+---------------+------------------+-----+---------------+
|       39000|         418|     Rosie Mullins|Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 10:04:00|              3.05|            0.0|             5.636| 20.0|          Money|
|       39000|        1445|        Guy Rivera|Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 12:36:00|              3.05|        

In [16]:
# adding new column with constant value
sdf.withColumn('author',F.lit('Kevin')).show(5)

+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+------+
|Company Code|Order Number|       Employee|          Product|Product Category|              Client|Client City|     Sale Date Time|Product Cost|Discount Amount| Amount| Total|Form of payment|author|
+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+------+
|       39000|          12|      Stacy Day| Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 04:10:00|        3.05|            0.0|  5.642| 20.02|          Money| Kevin|
|       39000|          21|Olive Stevenson|   Special Diesel|            Fuel|Customer not info...|    No City|2017-03-31 04:13:00|        2.51|            0.0|125.045| 350.0|     Debit Card| Kevin|
|    

In [39]:
#adding column from existing column
sdf.withColumn("5% Tax",sdf['Total']*0.05).show(5)

+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+------------------+
|Company Code|Order Number|       Employee|          Product|Product Category|              Client|Client City|     Sale Date Time|Product Cost|Discount Amount| Amount| Total|Form of payment|            5% Tax|
+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+------------------+
|       39000|          12|      Stacy Day| Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 04:10:00|        3.05|            0.0|  5.642| 20.02|          Money|1.0010000000000001|
|       39000|          21|Olive Stevenson|   Special Diesel|            Fuel|Customer not info...|    No City|2017-03-31 04:13:00|        2.51|            

In [17]:
#adding column from concatenation of existing column 
sdf.withColumn('complete product',F.concat_ws(',','Product','Product Category')).show(5)

+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+--------------------+
|Company Code|Order Number|       Employee|          Product|Product Category|              Client|Client City|     Sale Date Time|Product Cost|Discount Amount| Amount| Total|Form of payment|    complete product|
+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+--------------------+
|       39000|          12|      Stacy Day| Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 04:10:00|        3.05|            0.0|  5.642| 20.02|          Money|Special Gasoline,...|
|       39000|          21|Olive Stevenson|   Special Diesel|            Fuel|Customer not info...|    No City|2017-03-31 04:13:00|        2.51|    

In [19]:
#adding column with condition
sdf.withColumn("Product Classification",
               F.when(sdf["Product Cost"]<3, F.lit("cheap")) \
               .when((sdf["Product Cost"]>= 3) & (sdf["Product Cost"]<7),F.lit("average")) \
               .otherwise(F.lit("expensive"))
              ).show(20) #.to_koalas().head(20)

+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------------+---------------+------------------+------+--------------------+----------------------+
|Company Code|Order Number|       Employee|          Product|Product Category|              Client|Client City|     Sale Date Time|      Product Cost|Discount Amount|            Amount| Total|     Form of payment|Product Classification|
+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------------+---------------+------------------+------+--------------------+----------------------+
|       39000|          12|      Stacy Day| Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 04:10:00|              3.05|            0.0|             5.642| 20.02|               Money|               average|
|       39000|          21|Olive Stevenson|   Specia

In [37]:
#adding column from sparks builtin function
sdf.withColumn("today", current_date()).show(5)

+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+----------+
|Company Code|Order Number|       Employee|          Product|Product Category|              Client|Client City|     Sale Date Time|Product Cost|Discount Amount| Amount| Total|Form of payment|     today|
+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+----------+
|       39000|          12|      Stacy Day| Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 04:10:00|        3.05|            0.0|  5.642| 20.02|          Money|2021-10-13|
|       39000|          21|Olive Stevenson|   Special Diesel|            Fuel|Customer not info...|    No City|2017-03-31 04:13:00|        2.51|            0.0|125.045| 350.0|     Debit Ca

In [21]:
sdf.withColumn("current timestamp",F.current_timestamp()).show(5)

+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+--------------------+
|Company Code|Order Number|       Employee|          Product|Product Category|              Client|Client City|     Sale Date Time|Product Cost|Discount Amount| Amount| Total|Form of payment|   current timestamp|
+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+---------------+--------------------+
|       39000|          12|      Stacy Day| Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 04:10:00|        3.05|            0.0|  5.642| 20.02|          Money|2021-12-13 15:03:...|
|       39000|          21|Olive Stevenson|   Special Diesel|            Fuel|Customer not info...|    No City|2017-03-31 04:13:00|        2.51|    

In [42]:
#rename column
sdf.withColumnRenamed("Form of payment","Payment Type").show(5)

+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+------------+
|Company Code|Order Number|       Employee|          Product|Product Category|              Client|Client City|     Sale Date Time|Product Cost|Discount Amount| Amount| Total|Payment Type|
+------------+------------+---------------+-----------------+----------------+--------------------+-----------+-------------------+------------+---------------+-------+------+------------+
|       39000|          12|      Stacy Day| Special Gasoline|            Fuel|Customer not info...|    No City|2017-03-31 04:10:00|        3.05|            0.0|  5.642| 20.02|       Money|
|       39000|          21|Olive Stevenson|   Special Diesel|            Fuel|Customer not info...|    No City|2017-03-31 04:13:00|        2.51|            0.0|125.045| 350.0|  Debit Card|
|       39000|          38|      Stacy Day|   Special D

In [22]:
#map operation
sdf.rdd.map(lambda x: (x.Client + "," + x['Client City'],)).toDF(['Client']).show(5)
#rename column in toDF argument
# other options to refer the column : x.Client, x['Client'], x[5]

+--------------------+
|              Client|
+--------------------+
|Customer not info...|
|Customer not info...|
|Customer not info...|
|Customer not info...|
|Customer not info...|
+--------------------+
only showing top 5 rows



In [72]:
sdf.rdd.map(lambda x: (x['Product Cost'] + (x['Product Cost']*0.1),)).toDF(['Invlation Price']).show(5)

+------------------+
|   Invlation Price|
+------------------+
|             3.355|
|2.7609999999999997|
|2.7609999999999997|
|            8.1499|
|             2.816|
+------------------+
only showing top 5 rows



## Spark IO (write to SQL Server)

In [1]:
#sql server properties
server_name = "jdbc:sqlserver://12345678910"
database_name = "custom_dataset"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "big_data_demo"
username = "admin"
password = "admin" 

In [2]:
url

'jdbc:sqlserver://12345678910;databaseName=custom_dataset;'

In [59]:
try:
    sdf.write.mode("overwrite") \
        .format("jdbc") \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .option("url",url) \
        .option("dbtable",table_name) \
        .option("user",username) \
        .option("password",password) \
        .save()
    
except ValueError as error :
    print("Connector write failed", error)

In [60]:
print('write complete')

write complete


# Spark Dataframe to Koalas/Pandas Dataframe

In [None]:
kdf = sdf.to_koalas()

In [None]:
kdf.head()

Unnamed: 0,Company Code,Order Number,Employee,Product,Product Category,Client,Client City,Sale Date Time,Product Cost,Discount Amount,Amount,Form of payment,after tax
0,39000,12,Stacy Day,Special Gasoline,Fuel,Customer not informed,No City,2017-03-31 04:10:00,3.05,0.0,5.642,Money,22.022
1,39000,21,Olive Stevenson,Special Diesel,Fuel,Customer not informed,No City,2017-03-31 04:13:00,2.51,0.0,125.045,Debit Card,385.0
2,39000,38,Stacy Day,Special Diesel,Fuel,Customer not informed,No City,2017-03-31 04:25:00,2.51,0.0,35.699,Money,109.912
3,39000,39,Olive Stevenson,Lubricant 1108,Lubricant,Customer not informed,No City,2017-03-31 04:26:00,7.409,0.0,1.0,Money,14.3
4,39000,39,Olive Stevenson,Diesel Auto Clean,Fuel,Customer not informed,No City,2017-03-31 04:26:00,2.56,0.0,42.162,Money,133.056


In [46]:
def inspect_null(df):
    na_val = df.isnull().sum() * 100/ len(df)
    na_val = na_val[na_val >0].sort_values(ascending=False)
    return na_val

In [47]:
inspect_null(kdf)

Series([], dtype: float64)

In [48]:
kdf.describe()

Unnamed: 0,Company Code,Order Number,Product Cost,Discount Amount,Amount,after tax
count,26951160.0,26951160.0,26951160.0,26951160.0,26951160.0,26951160.0
mean,4451264.0,1385974.0,3.738021,0.003223069,17.41961,72.84016
std,4284039.0,607761.5,2.70079,0.6349943,28.35413,103.379
min,39000.0,8.0,-1.794,0.0,0.003,0.011
25%,882000.0,912516.0,3.391,0.0,4.916,22.0
50%,3234000.0,1410865.0,3.61,0.0,11.908,55.0
75%,10019000.0,1905212.0,3.774,0.0,23.703,110.0
max,14740000.0,2391978.0,409.0,803.4,3257.799,10998.9


In [49]:
kdf.info()

databricks.koalas.frame.DataFrame
Int64Index: 26951165 entries, 0 to 26951164
Data columns (total 13 columns):
 #   Column            Non-Null Count     Dtype  
---  ------            --------------     -----  
 0   Company Code      26951165 non-null  int32  
 1   Order Number      26951165 non-null  int32  
 2   Employee          26951165 non-null  object 
 3   Product           26951165 non-null  object 
 4   Product Category  26951165 non-null  object 
 5   Client            26951165 non-null  object 
 6   Client City       26951165 non-null  object 
 7   Sale Date Time    26951165 non-null  object 
 8   Product Cost      26951165 non-null  float64
 9   Discount Amount   26951165 non-null  float64
 10  Amount            26951165 non-null  float64
 11  Form of payment   26951165 non-null  object 
 12  after tax         26951165 non-null  float64
dtypes: float64(4), int32(2), object(7)

# Spark Streaming

In [74]:
df = pd.read_csv("D:\Data Engineer\Dataset\Stream Dataset\myFile0.csv")
df

Unnamed: 0,event,officeLoc,empFirstName,empLastName,email,empStatus,deviceStatus,profession,tempC
0,1,site A,Natka,Zaslow,Natka.Zaslow@nusanetwork.com,active,DOWN,Network Engineer,37.4


In [45]:
schema = StructType([
    StructField('event',IntegerType(),True),
    StructField('officeLoc',StringType(),True),
    StructField('empFirstName',StringType(),True),
    StructField('empLastName',StringType(),True),
    StructField('email',StringType(),True),
    StructField('empStatus',StringType(),True),
    StructField('deviceStatus',StringType(),True),
    StructField('profession',StringType(),True),
    StructField('tempC',DoubleType(),True),
])

In [16]:
streamquery = spark.readStream.option('maxFilesPerTrigger',1).csv('D:/Data Engineer/Dataset/Stream Dataset',header=True,schema=schema)

In [55]:
streamquery.isStreaming

True

In [47]:
streamquery.printSchema()

root
 |-- event: integer (nullable = true)
 |-- officeLoc: string (nullable = true)
 |-- empFirstName: string (nullable = true)
 |-- empLastName: string (nullable = true)
 |-- email: string (nullable = true)
 |-- empStatus: string (nullable = true)
 |-- deviceStatus: string (nullable = true)
 |-- profession: string (nullable = true)
 |-- tempC: double (nullable = true)



In [48]:
complete_query = streamquery.writeStream.queryName('complete_view').format('memory').outputMode('append').start()

In [22]:
for x in range(5):
    spark.sql("select * from complete_view where tempC > 38").show()
    time.sleep(10)

+-----+---------+------------+-----------+--------------------+----------+------------+----------------+-----+
|event|officeLoc|empFirstName|empLastName|               email| empStatus|deviceStatus|      profession|tempC|
+-----+---------+------------+-----------+--------------------+----------+------------+----------------+-----+
|    1|   site C|       Dotty|Fitzsimmons|Dotty.Fitzsimmons...|    active|        DOWN|             BOD| 42.8|
|    1|   site D|     Shandie|   Holbrook|Shandie.Holbrook@...|non-active|        DOWN|           Sales| 49.4|
|    1|   site A|   Annaliese|      Dosia|Annaliese.Dosia@n...|non-active|          UP|   Data Engineer| 39.8|
|    1|   site B|        Dale|    Pacorro|Dale.Pacorro@nusa...|non-active|        DOWN|             BOD| 46.5|
|    1|   site C|     Ardenia|    Hilbert|Ardenia.Hilbert@n...| suspended|        DOWN|Network Engineer| 39.5|
|    1|   site D|       Linet|     Cecile|Linet.Cecile@nusa...| suspended|          UP|       Operation| 50.9|
|

In [49]:
streamToSql = spark.sql("select profession,count(*) as access_count from complete_view group by profession order by 2 desc")
table_name = 'sparkstream'

In [54]:
streamToSql.select('*').show()

+----------------+------------+
|      profession|access_count|
+----------------+------------+
|       Operation|         106|
|Network Engineer|          97|
|   Data Engineer|          83|
|             BOD|          82|
|           Sales|          73|
+----------------+------------+



In [61]:
try:
    streamToSql.write.mode("overwrite") \
        .format("jdbc") \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .option("url",url) \
        .option("dbtable",table_name) \
        .option("user",username) \
        .option("password",password) \
        .save()
    
except ValueError as error :
    print("Connector write failed", error)

In [40]:
spark.stop()

# Spark Batch and Stream Processing

In [None]:
#Batch

try:
    sdf.write.mode("overwrite") \
        .format("jdbc") \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .option("url",url) \
        .option("dbtable",table_name) \
        .option("user",username) \
        .option("password",password) \
        .save()
    
except ValueError as error :
    print("Connector write failed", error)

In [19]:
try:
    streamquery.write.mode("overwrite") \
        .format("jdbc") \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .option("url",url) \
        .option("dbtable",table_name) \
        .option("user",username) \
        .option("password",password) \
        .save()
    
except ValueError as error :
    print("Connector write failed", error)

DataFrame[event: int, officeLoc: string, empFirstName: string, empLastName: string, email: string, empStatus: string, deviceStatus: string, profession: string, tempC: double]

# Ad Hoc Development

In [67]:
sdf.groupBy("Employee").count().show()

+-----------------+------+
|         Employee| count|
+-----------------+------+
|      Rudy Norton|111879|
|     Fannie Evans| 84485|
|   Belinda Hudson| 59568|
|    Bobbie Graves| 81610|
|Wallace Armstrong|     2|
|  Joanne Crawford|  4244|
|      Marsha Tate| 10203|
|       Vicky Bell|   881|
|     Rafael Bryan|    40|
|  Willard Freeman| 70056|
|      Sam Herrera|  4010|
|     Geneva Reese| 13750|
|   Darrell Butler|  2675|
| Francis Gonzalez| 25404|
|  Megan Zimmerman| 58512|
|      Eileen Cook|    50|
|   Wesley Mcguire|   925|
|      Lisa Torres|  6580|
|     Grace Briggs|    52|
|     Donnie Cohen| 25313|
+-----------------+------+
only showing top 20 rows



In [72]:
sdf.select(["Form of payment"]).distinct().show(truncate = False)

+--------------------------------------+
|Form of payment                       |
+--------------------------------------+
|Pre-paid check / Pre- date check      |
|Credit Card                           |
|Check                                 |
|Money                                 |
|Others                                |
|Debit Card                            |
|Extended payment plan/ In installments|
+--------------------------------------+

