### Imports

In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
# Suggestions :
#     *Send the file over
#     *See examples from roman
#     *send data to postgres in AWS cloud -  remote database
#     *Put links where from data is avaliable
#     *Create notebook with decoratiors for interactive - present if possible

In [3]:
## imports
import pandas as pd
import numpy as np
import sys
from __future__ import print_function # 
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

from pyspark.sql import SparkSession
import os
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
 
spark = SparkSession.builder.appName('Data Ingestion Spark')\
                            .master("local[2]")\
                            .config("spark.driver.memory", "2g")\
                            .getOrCreate()



sys.path.insert(0, "../")
from utility import converter, exporter, missing

In [135]:
spark

### Setup

#### CSV, Parquet

In [5]:
## change contents of this cell for loading files
FILE_NAME = "../data/new_merchant_transactions.csv"
FILE_FORMAT = "csv" # csv, numpy, parquet


NEW_FILE_NAME = "../data/new_merchant_transactions.parquet"
NEW_FILE_FORMAT = "parquet" # csv, numpy, parquet

In [78]:
# load data
if(FILE_FORMAT == 'csv'):
    data = spark.read.csv(FILE_NAME, header=True, inferSchema=True)
elif(FILE_FORMAT == 'parquet'):
    data = spark.read.parquet(FILE_NAME)
else:
    print("INCORRECT FILE FORMAT")




#### Database - Postgres

In [79]:
table_name = ""
username = "sahil"
database = "spark_demo_db"
password = "1234567890"
host = "localhost"

In [80]:
## Read from Database
from sqlalchemy import create_engine
if table_name != "":
    engine = create_engine('postgresql://{}:{}@{}:5432/{}'.format(username, password, host, database))
    data = pd.read_sql_table(table_name, engine)

### Basic Sanity Check

In [81]:
# Convert Column names to lower case, replace spaces with _ (underscore)
for col in data.columns:
    data = data.withColumnRenamed(col, col.lower().replace(" ", ""))

In [82]:
## Display column names
from pprint import pprint
pprint(sorted(list(data.columns)))

['authorized_flag',
 'card_id',
 'category_1',
 'category_2',
 'category_3',
 'city_id',
 'installments',
 'merchant_category_id',
 'merchant_id',
 'month_lag',
 'purchase_amount',
 'purchase_date',
 'state_id',
 'subsector_id']


In [83]:
## Shape of dataframe
data.count(), len(data.columns)

(1963031, 14)

In [84]:
data.printSchema()

root
 |-- authorized_flag: string (nullable = true)
 |-- card_id: string (nullable = true)
 |-- city_id: integer (nullable = true)
 |-- category_1: string (nullable = true)
 |-- installments: integer (nullable = true)
 |-- category_3: string (nullable = true)
 |-- merchant_category_id: integer (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- month_lag: integer (nullable = true)
 |-- purchase_amount: double (nullable = true)
 |-- purchase_date: timestamp (nullable = true)
 |-- category_2: double (nullable = true)
 |-- state_id: integer (nullable = true)
 |-- subsector_id: integer (nullable = true)



In [85]:
## show data
data.limit(5).toPandas().head(5)

Unnamed: 0,authorized_flag,card_id,city_id,category_1,installments,category_3,merchant_category_id,merchant_id,month_lag,purchase_amount,purchase_date,category_2,state_id,subsector_id
0,Y,C_ID_415bb3a509,107,N,1,B,307,M_ID_b0c793002c,1,-0.557574,2018-03-11 14:57:36,1.0,9,19
1,Y,C_ID_415bb3a509,140,N,1,B,307,M_ID_88920c89e8,1,-0.56958,2018-03-19 18:53:37,1.0,9,19
2,Y,C_ID_415bb3a509,330,N,1,B,507,M_ID_ad5237ef6b,2,-0.551037,2018-04-26 14:08:44,1.0,9,14
3,Y,C_ID_415bb3a509,-1,Y,1,B,661,M_ID_9e84cda3b1,1,-0.671925,2018-03-07 09:43:21,,-1,8
4,Y,C_ID_ef55cf8d4b,-1,Y,1,B,166,M_ID_3c86fa3831,1,-0.659904,2018-03-22 21:07:53,,-1,29


In [86]:
## put more descriptions of how this was achieved-
## spend time to understand how decorators work - Object Oriented Python , TDD
@interact
def show_articles_more_than(column=sorted(list(data.columns))):
    return data.select(column).limit(15).toPandas()

### Convert to correct type

In [107]:
from pyspark.sql.functions import col, to_timestamp
def convert_to(data, column_list, new_type, inplace=True):
    """
    Converts a set of columns to given type. Avaliable types are int, float, string, timestamp
    
    Parameters
    ----------
    data : Spark Dataframe
    column_list : String or List
        Names of column(s) which needs to be transformed
    new_type:
        
    
    
    """
    # convert to list if a single string is passed
    if isinstance(column_list, str):
        column_list = [column_list]
        
    new_type = new_type.lower()
    
    for col_name in column_list:
        if(inplace is True):
            data = data.withColumn(col_name, col(col_name).cast(new_type))
        else:
            data = data.withColumn(col_name + "_TS", col(col_name).cast(new_type))
        
    
    return data

def convert_to_timestamp(data, column_list, timestamp_format, inplace=True):
    """
    Converts a column from string to date time format
    """
    if isinstance(column_list, str):
        column_list = [column_list]
    
    for col_name in column_list:
        if(inplace is True):
            data = data.withColumn(col_name, to_timestamp(col_name, format=timestamp_format))
        else:
            data = data.withColumn(col_name + "_TS", to_timestamp(col_name, format=timestamp_format))
    return data

# data = converter.convert_to(data, ["purchase_date"], 'timestamp')

In [108]:
data = convert_to(data, "purchase_date", "string", inplace=False)
data.printSchema()

root
 |-- authorized_flag: string (nullable = true)
 |-- card_id: string (nullable = true)
 |-- city_id: integer (nullable = true)
 |-- category_1: string (nullable = true)
 |-- installments: integer (nullable = true)
 |-- category_3: string (nullable = true)
 |-- merchant_category_id: integer (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- month_lag: integer (nullable = true)
 |-- purchase_amount: double (nullable = true)
 |-- purchase_date: timestamp (nullable = true)
 |-- category_2: double (nullable = true)
 |-- state_id: integer (nullable = true)
 |-- subsector_id: integer (nullable = true)
 |-- purchase_date_TS: string (nullable = true)



In [109]:
data = convert_to_timestamp(data, "purchase_date_TS", 'yyyy-MM-dd HH:mm:ss')
data.printSchema()

root
 |-- authorized_flag: string (nullable = true)
 |-- card_id: string (nullable = true)
 |-- city_id: integer (nullable = true)
 |-- category_1: string (nullable = true)
 |-- installments: integer (nullable = true)
 |-- category_3: string (nullable = true)
 |-- merchant_category_id: integer (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- month_lag: integer (nullable = true)
 |-- purchase_amount: double (nullable = true)
 |-- purchase_date: timestamp (nullable = true)
 |-- category_2: double (nullable = true)
 |-- state_id: integer (nullable = true)
 |-- subsector_id: integer (nullable = true)
 |-- purchase_date_TS: timestamp (nullable = true)



In [111]:
data.limit(5).toPandas().head()

Unnamed: 0,authorized_flag,card_id,city_id,category_1,installments,category_3,merchant_category_id,merchant_id,month_lag,purchase_amount,purchase_date,category_2,state_id,subsector_id,purchase_date_TS
0,Y,C_ID_415bb3a509,107,N,1,B,307,M_ID_b0c793002c,1,-0.557574,2018-03-11 14:57:36,1.0,9,19,2018-03-11 14:57:36
1,Y,C_ID_415bb3a509,140,N,1,B,307,M_ID_88920c89e8,1,-0.56958,2018-03-19 18:53:37,1.0,9,19,2018-03-19 18:53:37
2,Y,C_ID_415bb3a509,330,N,1,B,507,M_ID_ad5237ef6b,2,-0.551037,2018-04-26 14:08:44,1.0,9,14,2018-04-26 14:08:44
3,Y,C_ID_415bb3a509,-1,Y,1,B,661,M_ID_9e84cda3b1,1,-0.671925,2018-03-07 09:43:21,,-1,8,2018-03-07 09:43:21
4,Y,C_ID_ef55cf8d4b,-1,Y,1,B,166,M_ID_3c86fa3831,1,-0.659904,2018-03-22 21:07:53,,-1,29,2018-03-22 21:07:53


In [126]:
# data = data

### Missing Value Statistics

In [133]:
from pyspark.sql.functions import col,sum
def missing_df(df):
    num_rows = df.count()
    missing = df.select(*((sum(col(c).isNull().cast("int"))).alias(c)  for c in df.columns))
    
    # use pandas to convert it to percent and recast it to spark dataframe
    missing = missing.toPandas().T
    missing = missing.reset_index()
    missing.columns = ["column","missing"]
    missing["pct_missing"]  = missing["missing"] * 100 / num_rows
    missing = missing.sort_values('pct_missing', ascending=False)
    missing = spark.createDataFrame(missing)
    
    return missing
    
missing_df(data).show()

+--------------------+-------+------------------+
|              column|missing|       pct_missing|
+--------------------+-------+------------------+
|          category_2| 111745| 5.692472508075522|
|          category_3|  55922|2.8487578647509895|
|         merchant_id|  26216|1.3354857870303627|
|     authorized_flag|      0|               0.0|
|             card_id|      0|               0.0|
|             city_id|      0|               0.0|
|          category_1|      0|               0.0|
|        installments|      0|               0.0|
|merchant_category_id|      0|               0.0|
|           month_lag|      0|               0.0|
|     purchase_amount|      0|               0.0|
|       purchase_date|      0|               0.0|
|            state_id|      0|               0.0|
|        subsector_id|      0|               0.0|
|    purchase_date_TS|      0|               0.0|
+--------------------+-------+------------------+



### General Statistics

In [137]:
data.describe().toPandas()

Unnamed: 0,summary,authorized_flag,card_id,city_id,category_1,installments,category_3,merchant_category_id,merchant_id,month_lag,purchase_amount,category_2,state_id,subsector_id
0,count,1963031,1963031,1963031.0,1963031,1963031.0,1907109,1963031.0,1936815,1963031.0,1963031.0,1851286.0,1963031.0,1963031.0
1,mean,,,134.38667906925565,,0.68296425272958,,430.9701349596619,,1.476515144182644,-0.5509689991947693,2.1978413924158664,10.880668720972825,25.976241332918327
2,stddev,,,101.51524185325826,,1.5840690279734224,,246.3385132167812,,0.4994482842301151,0.6940042578499607,1.5281252821736917,6.038542094835003,10.129082649402267
3,min,Y,C_ID_00007093c1,-1.0,N,-1.0,A,-1.0,M_ID_000025127f,1.0,-0.74689277,1.0,-1.0,-1.0
4,max,Y,C_ID_fffffd5772,347.0,Y,999.0,C,891.0,M_ID_ffff0af8e7,2.0,263.15749789,5.0,24.0,41.0


### Export File to required_format

In [140]:
data.write.format(NEW_FILE_FORMAT).option("path",NEW_FILE_NAME).save()

In [141]:
data2 = spark.read.parquet(NEW_FILE_NAME)
data2.count()

1963031

In [142]:
data2 = spark.read.csv(FILE_NAME, header=True, inferSchema=True)
data2.count()

1963031

In [143]:
data3 = spark.read.csv(FILE_NAME, header=True, inferSchema=False)
data3.count()

1963031

In [145]:
data3.printSchema()

root
 |-- authorized_flag: string (nullable = true)
 |-- card_id: string (nullable = true)
 |-- city_id: string (nullable = true)
 |-- category_1: string (nullable = true)
 |-- installments: string (nullable = true)
 |-- category_3: string (nullable = true)
 |-- merchant_category_id: string (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- month_lag: string (nullable = true)
 |-- purchase_amount: string (nullable = true)
 |-- purchase_date: string (nullable = true)
 |-- category_2: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- subsector_id: string (nullable = true)

