In [1]:
# Imports 

#import findspark
#findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.types import*

#from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType

import ipywidgets as widgets
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import requests
import seaborn as sns


## Loading Data and creating Spark DataFrames

In [None]:
spark = SparkSession\
    .builder\
    .appName("capstone_analysis")\
    .config("spark.jars”,”/Users/purple/Documents/Project/Data_Engineering/Python/Capstone1/data/mysql-connector-j-8.0.32.jar")\
    .getOrCreate()

In [None]:
spark.stop()

In [2]:
spark = SparkSession.builder.master("local[1]").appName("Credit_Cards").getOrCreate()

In [3]:
df_branch = spark.read.json('data/cdw_sapp_branch.json')  
df_credit = spark.read.json('data/cdw_sapp_credit.json') 
df_customers = spark.read.json('data/cdw_sapp_custmer.json')

In [4]:
type(df_branch)

pyspark.sql.dataframe.DataFrame

## Cleaning DataFrame according to mapping doc

In [5]:
df_branch.show(5)

+-----------------+-----------+------------+------------+------------+-----------------+----------+-------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|    BRANCH_STREET|BRANCH_ZIP|       LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-----------------+----------+-------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|     Bridle Court|     55044|2018-04-18 16:51:47|
|          Huntley|          2|Example Bank|  1234618993|          IL|Washington Street|     60142|2018-04-18 16:51:47|
|SouthRichmondHill|          3|Example Bank|  1234985926|          NY|    Warren Street|     11419|2018-04-18 16:51:47|
|       Middleburg|          4|Example Bank|  1234663064|          FL| Cleveland Street|     32068|2018-04-18 16:51:47|
|    KingOfPrussia|          5|Example Bank|  1234849701|          PA|      14th Street|     19406|2018-04-18 16:51:47|
+-----------------+-----------+---------

In [6]:
df_branch.columns

['BRANCH_CITY',
 'BRANCH_CODE',
 'BRANCH_NAME',
 'BRANCH_PHONE',
 'BRANCH_STATE',
 'BRANCH_STREET',
 'BRANCH_ZIP',
 'LAST_UPDATED']

In [7]:
df_branch.select('BRANCH_PHONE').show(5)

+------------+
|BRANCH_PHONE|
+------------+
|  1234565276|
|  1234618993|
|  1234985926|
|  1234663064|
|  1234849701|
+------------+
only showing top 5 rows



In [8]:
#transforming the branch df
df_branch = df_branch.withColumn('BRANCH_PHONE', concat(lit('('), col('BRANCH_PHONE')[0:3], lit(')'), 
                                col('BRANCH_PHONE')[4:3], lit('-'), col('BRANCH_PHONE')[7:9]))

df_branch.select('BRANCH_PHONE').show(5)

+-------------+
| BRANCH_PHONE|
+-------------+
|(123)456-5276|
|(123)461-8993|
|(123)498-5926|
|(123)466-3064|
|(123)484-9701|
+-------------+
only showing top 5 rows



In [9]:
#checking for nulls in zip code
df_branch.filter(df_branch.BRANCH_ZIP.isNull()).show()

+-----------+-----------+-----------+------------+------------+-------------+----------+------------+
|BRANCH_CITY|BRANCH_CODE|BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|BRANCH_STREET|BRANCH_ZIP|LAST_UPDATED|
+-----------+-----------+-----------+------------+------------+-------------+----------+------------+
+-----------+-----------+-----------+------------+------------+-------------+----------+------------+



In [10]:
#there are no nulls in branch zip
print(df_branch.count())
print(df_branch.filter(col('BRANCH_ZIP').isNull()).count())
print(df_branch.filter(col('BRANCH_ZIP').isNotNull()).count())

115
0
115


### Keep??

In [11]:
df_branch.na.fill(value=99999,subset=['BRANCH_ZIP'])


DataFrame[BRANCH_CITY: string, BRANCH_CODE: bigint, BRANCH_NAME: string, BRANCH_PHONE: string, BRANCH_STATE: string, BRANCH_STREET: string, BRANCH_ZIP: bigint, LAST_UPDATED: timestamp]

In [12]:
df_credit.show(5)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|
|        114|4210653349028689|123459988| 19|    4|             4|   Entertainment|            59.73|2018|
|         93|4210653349028689|123459988| 10|   10|             5|             Gas|             3.59|2018|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
only showing top 5 rows



In [13]:
df_credit.columns

['BRANCH_CODE',
 'CREDIT_CARD_NO',
 'CUST_SSN',
 'DAY',
 'MONTH',
 'TRANSACTION_ID',
 'TRANSACTION_TYPE',
 'TRANSACTION_VALUE',
 'YEAR']

In [14]:
df_credit = df_credit.withColumnRenamed('CREDIT_CARD_NO', 'CUST_CC_NO')
df_credit = df_credit.withColumn('TIMEID', expr('make_date(YEAR, MONTH, DAY)'))
df_credit.show(5)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+----------+
|BRANCH_CODE|      CUST_CC_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|    TIMEID|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+----------+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|2018-02-14|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|2018-03-20|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|2018-07-08|
|        114|4210653349028689|123459988| 19|    4|             4|   Entertainment|            59.73|2018|2018-04-19|
|         93|4210653349028689|123459988| 10|   10|             5|             Gas|             3.59|2018|2018-10-10|
+-----------+----------------+---------+---+-----+--------------

In [15]:
#df_credit = df_credit.drop('DAY', 'MONTH', 'YEAR')
#df_credit.columns

### Clarify customer phone conversion - only have 7 digits

In [16]:
df_customers.select('CUST_PHONE').show(5)

+----------+
|CUST_PHONE|
+----------+
|   1237818|
|   1238933|
|   1243018|
|   1243215|
|   1242074|
+----------+
only showing top 5 rows



In [17]:
df_customers.show(5)

+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+-------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|       LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+-------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States|AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21 12:49:02|         Wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States|EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21 12:49:02|    Brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States|WDunham@example.com|   

In [18]:
df_customers.columns

['APT_NO',
 'CREDIT_CARD_NO',
 'CUST_CITY',
 'CUST_COUNTRY',
 'CUST_EMAIL',
 'CUST_PHONE',
 'CUST_STATE',
 'CUST_ZIP',
 'FIRST_NAME',
 'LAST_NAME',
 'LAST_UPDATED',
 'MIDDLE_NAME',
 'SSN',
 'STREET_NAME']

In [19]:
#transforming the customer df
df_customers = df_customers.withColumn('FIRST_NAME', initcap(df_customers['FIRST_NAME']))
df_customers = df_customers.withColumn('MIDDLE_NAME', lower(df_customers['MIDDLE_NAME']))
df_customers = df_customers.withColumn('LAST_NAME', initcap(df_customers['LAST_NAME']))
df_customers = df_customers.withColumnRenamed('CREDIT_CARD_NO', 'Credit_card_no')
df_customers = df_customers.withColumn('FULL_STREET_ADDRESS', concat(col('APT_NO'),lit(','),col('STREET_NAME')))
df_customers.show(5)


+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+-------------------+-----------+---------+-----------------+--------------------+
|APT_NO|  Credit_card_no|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|       LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME| FULL_STREET_ADDRESS|
+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+-------------------+-----------+---------+-----------------+--------------------+
|   656|4210653310061055|     Natchez|United States|AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21 12:49:02|         wm|123456100|Main Street North|656,Main Street N...|
|   829|4210653310102868|Wethersfield|United States|EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21 12:49:02|    brendan|123453023|    Redwoo

In [20]:
#dropping address and apt cols
df_customers = df_customers.drop('APT_NO', 'STREET_NAME')
df_customers.columns

['Credit_card_no',
 'CUST_CITY',
 'CUST_COUNTRY',
 'CUST_EMAIL',
 'CUST_PHONE',
 'CUST_STATE',
 'CUST_ZIP',
 'FIRST_NAME',
 'LAST_NAME',
 'LAST_UPDATED',
 'MIDDLE_NAME',
 'SSN',
 'FULL_STREET_ADDRESS']

In [21]:
print(df_branch.show())
print(df_credit.show())
print(df_customers.show())

+-----------------+-----------+------------+-------------+------------+-------------------+----------+-------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME| BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|       LAST_UPDATED|
+-----------------+-----------+------------+-------------+------------+-------------------+----------+-------------------+
|        Lakeville|          1|Example Bank|(123)456-5276|          MN|       Bridle Court|     55044|2018-04-18 16:51:47|
|          Huntley|          2|Example Bank|(123)461-8993|          IL|  Washington Street|     60142|2018-04-18 16:51:47|
|SouthRichmondHill|          3|Example Bank|(123)498-5926|          NY|      Warren Street|     11419|2018-04-18 16:51:47|
|       Middleburg|          4|Example Bank|(123)466-3064|          FL|   Cleveland Street|     32068|2018-04-18 16:51:47|
|    KingOfPrussia|          5|Example Bank|(123)484-9701|          PA|        14th Street|     19406|2018-04-18 16:51:47|
|         Paters

In [22]:
df_branch.printSchema()


root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [23]:
df_credit.printSchema()


root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CUST_CC_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)
 |-- TIMEID: date (nullable = true)



In [24]:
df_customers.printSchema()

root
 |-- Credit_card_no: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)



Once PySpark reads data from JSON files, and then utilizes Python, PySpark, and Python modules to load data into RDBMS(SQL), perform the following:
 
Create a Database in SQL(MariaDB), named “creditcard_capstone.”
Create a Python and Pyspark Program to load/write the “Credit Card System Data” into RDBMS(creditcard_capstone).
Tables should be created by the following names in RDBMS:
CDW_SAPP_BRANCH
CDW_SAPP_CREDIT_CARD
CDW_SAPP_CUSTOMER 

In [None]:
df_branch.write.format("jdbc") \
  .mode("overwrite") \
  .option("driver", "com.mysql.cj.jdbc.Driver")\
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_BRANCH") \
  .option("user", "root") \
  .option("password", "pass1234") \
  .save()

In [None]:
df_credit.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_CREDIT_CARD") \
  .option("user", "root") \
  .option("password", "pass1234") \
  .save()

In [None]:
df_branch.write.format("jdbc") \
  .mode("overwrite") \
  .option("driver", "com.mysql.cj.jdbc.Driver")\
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_BRANCH") \
  .option("user", "root") \
  .option("password", "pass1234") \
  .save()

In [None]:
df_customers.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_CUSTOMER") \
  .option("user", "root") \
  .option("password", "pass1234") \
  .save()

In [None]:
df_branch.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_BRANCH") \
  .option("user", "root") \
  .option("password", "pass1234") \
  .save()

df_credit.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_CREDIT_CARD") \
  .option("user", "root") \
  .option("password", "pass1234") \
  .save()

df_customers.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW_SAPP_CUSTOMER ") \
  .option("user", "root") \
  .option("password", "pass1234") \
  .save()

1)    Used to display the transactions made by customers living in a given zip code for a given month and year. Order by day in descending order.
2)    Used to display the number and total values of transactions for a given type.
3)    Used to display the number and total values of transactions for branches in a given state.

In [None]:
pd_branch = df_branch.toPandas()
pd_credit = df_credit.toPandas()
pd_customers = df_customers.toPandas()


In [None]:
pd_credit.head()

In [None]:
pd_credit.info()

In [None]:
pd_customers.head()

In [None]:
pd_credit = pd_credit.rename(columns = {'CUST_SSN':'SSN'})
pd_credit.head()

In [None]:
pd_customer_credit = pd.merge(pd_credit, pd_customers, on ='SSN')
pd_customer_credit.info()

In [None]:
pd_customer_credit['YEAR'].value_counts()

In [None]:
pd_customer_credit = pd_customer_credit.drop(['CUST_CC_NO','CUST_EMAIL','TIMEID','TRANSACTION_ID',
                'Credit_card_no', 'CUST_PHONE', 'LAST_UPDATED', 'CUST_CITY','CUST_COUNTRY'], axis=1)
pd_customer_credit.head()

In [None]:
pd_trans_type = pd_customer_credit.groupby(['TRANSACTION_TYPE'])['TRANSACTION_VALUE'].sum()
pd_trans_type

In [None]:
#https://towardsdatascience.com/bring-your-jupyter-notebook-to-life-with-interactive-widgets-bc12e03f0916
ALL = 'ALL'
def unique_sorted_values_plus_ALL(array):
    unique = array.unique().tolist()
    unique.sort()
    unique.insert(0, ALL)
    return unique

In [None]:
output = widgets.Output()

dropdown_zipcode = widgets.Dropdown(options = unique_sorted_values_plus_ALL(pd_customer_credit.CUST_ZIP))
dropdown_month = widgets.Dropdown(options = unique_sorted_values_plus_ALL(pd_customer_credit.MONTH))

def common_filtering(month, zipcode):
    output.clear_output()
    
    if (month == ALL) & (zipcode == ALL):
        common_filter = pd_customer_credit
    elif (month == ALL):
        common_filter = pd_customer_credit[pd_customer_credit.CUST_ZIP == zipcode]
    elif (zipcode == ALL):
        common_filter = pd_customer_credit[pd_customer_credit.MONTH == month]
    else:
        common_filter = pd_customer_credit[(pd_customer_credit.MONTH == month) & 
                                  (pd_customer_credit.CUST_ZIP == zipcode)]
    
    with output:
        display(common_filter)
        
        
def dropdown_month_eventhandler(change):
    common_filtering(change.new, dropdown_zipcode.value)
    
def dropdown_zipcode_eventhandler(change):
    common_filtering(dropdown_month.value, change.new)

dropdown_month.observe(dropdown_month_eventhandler, names='value')
dropdown_zipcode.observe(dropdown_zipcode_eventhandler, names='value')



display(dropdown_zipcode)
display(dropdown_month)

In [None]:
display(output)

In [25]:
df_cust_and_credit = df_credit.join(df_customers, df_credit.CUST_SSN == df_customers.SSN,"outer")

In [58]:
#changing BRANCH_CODE name - to avoid ambigous column error when querying
df_branch = df_branch.withColumnRenamed("BRANCH_CODE","BRANCHCODE") 
df_branch_and_credit = df_credit.join(df_branch, df_credit.BRANCH_CODE == df_branch.BRANCHCODE, "outer")

In [45]:
df_cust_and_credit.printSchema()

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CUST_CC_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)
 |-- Credit_card_no: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)



In [28]:
df_branch_and_credit.printSchema()

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CUST_CC_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)
 |-- TIMEID: date (nullable = true)
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [59]:
df_cust_and_credit = df_cust_and_credit.drop('TIMEID')
df_cust_and_credit = df_cust_and_credit.drop('LAST_UPDATED')
df_branch_and_credit = df_branch_and_credit.drop('BRANCHCODE')

In [30]:
df_cust_and_credit.select([count(when(isnan(c), c)).alias(c) for c in df_cust_and_credit.columns]).show()

+-----------+----------+--------+---+-----+--------------+----------------+-----------------+----+--------------+---------+------------+----------+----------+----------+--------+----------+---------+-----------+---+-------------------+
|BRANCH_CODE|CUST_CC_NO|CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|Credit_card_no|CUST_CITY|CUST_COUNTRY|CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|MIDDLE_NAME|SSN|FULL_STREET_ADDRESS|
+-----------+----------+--------+---+-----+--------------+----------------+-----------------+----+--------------+---------+------------+----------+----------+----------+--------+----------+---------+-----------+---+-------------------+
|          0|         0|       0|  0|    0|             0|               0|                0|   0|             0|        0|           0|         0|         0|         0|       0|         0|        0|          0|  0|                  0|
+-----------+----------+--------+---+-----+-------------

In [31]:
df_cust_and_credit.select([count(when(col(c).isNull(), c)).alias(c) for c in df_cust_and_credit.columns]).show()

+-----------+----------+--------+---+-----+--------------+----------------+-----------------+----+--------------+---------+------------+----------+----------+----------+--------+----------+---------+-----------+---+-------------------+
|BRANCH_CODE|CUST_CC_NO|CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|Credit_card_no|CUST_CITY|CUST_COUNTRY|CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|MIDDLE_NAME|SSN|FULL_STREET_ADDRESS|
+-----------+----------+--------+---+-----+--------------+----------------+-----------------+----+--------------+---------+------------+----------+----------+----------+--------+----------+---------+-----------+---+-------------------+
|          0|         0|       0|  0|    0|             0|               0|                0|   0|             0|        0|           0|         0|         0|         0|       0|         0|        0|          0|  0|                  0|
+-----------+----------+--------+---+-----+-------------

In [60]:
df_branch.createOrReplaceTempView("branch")
df_credit.createOrReplaceTempView("credit")
df_customers.createOrReplaceTempView("customers")
df_cust_and_credit.createOrReplaceTempView("cc")
df_branch_and_credit.createOrReplaceTempView("bcr")

In [33]:
df_cust_and_credit.show(2)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+----------------+---------+-------------+-------------------+----------+----------+--------+----------+---------+-----------+---------+-------------------+
|BRANCH_CODE|      CUST_CC_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|  Credit_card_no|CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|MIDDLE_NAME|      SSN|FULL_STREET_ADDRESS|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+----------------+---------+-------------+-------------------+----------+----------+--------+----------+---------+-----------+---------+-------------------+
|         93|4210653341381529|123452490| 14|    7|         39544|      Healthcare|             13.1|2018|4210653341381529|   Vienna|United States|CSavage@example.com|   1242572|        VA|   22180|     Clara|   Savage

In [34]:
def get_transactions_by_zipcode(zipcode, year, month):
    cols = "SELECT cc.LAST_NAME, cc.FIRST_NAME, cc.CUST_ZIP, cc.YEAR, cc.MONTH, cc.DAY"
    table = " FROM cc"
    limits = f" WHERE cc.CUST_ZIP = {str(zipcode)} AND cc.YEAR = {str(year)} AND cc.MONTH = {str(month)}"
    sorting = " ORDER BY cc.DAY DESC" 

    results = spark.sql(cols + table + limits + sorting)
    results.show(150)

get_transactions_by_zipcode(22180, 2018, 7)

+---------+----------+--------+----+-----+---+
|LAST_NAME|FIRST_NAME|CUST_ZIP|YEAR|MONTH|DAY|
+---------+----------+--------+----+-----+---+
|      Poe|    Leslie|   22180|2018|    7| 28|
|      Poe|    Leslie|   22180|2018|    7| 28|
|  Salinas|     Keven|   22180|2018|    7| 28|
|      Poe|    Leslie|   22180|2018|    7| 28|
|  Conklin|  Harrison|   22180|2018|    7| 27|
|   Ramsey|      Omar|   22180|2018|    7| 26|
|  Salinas|     Keven|   22180|2018|    7| 24|
|  Salinas|     Keven|   22180|2018|    7| 23|
|   Savage|     Clara|   22180|2018|    7| 23|
|  Conklin|  Harrison|   22180|2018|    7| 23|
|   Savage|     Clara|   22180|2018|    7| 22|
|  Salinas|     Keven|   22180|2018|    7| 21|
|  Salinas|     Keven|   22180|2018|    7| 21|
|   Savage|     Clara|   22180|2018|    7| 20|
|   Ramsey|      Omar|   22180|2018|    7| 19|
|  Mcclain|      Kurt|   22180|2018|    7| 18|
|   Ramsey|      Omar|   22180|2018|    7| 18|
|      Poe|    Leslie|   22180|2018|    7| 18|
|      Poe|  

In [35]:
transaction_type = input("Please choose transaction type from the following - Education, Entertainment\
Healthcare, Grocery, Test, Gas, Bills: " )

def get_transaction_by_type(transaction_type):
    transactions = spark.sql("""
        SELECT TRANSACTION_VALUE
        FROM credit
        WHERE TRANSACTION_TYPE = '{}'
    """.format(transaction_type))

    num_transactions = transactions.count()
    total_value = transactions.agg({"TRANSACTION_VALUE": "sum"}).collect()[0][0]

    print("Number of {} transactions: {}".format(transaction_type, num_transactions))
    print("Total value of {} transactions: ${:.2f}".format(transaction_type, total_value))
    
get_transaction_by_type(transaction_type)

Please choose transaction type from the following - Education, Entertainment                         Healthcare, Grocery, Test, Gas, Bills: Entertainment
Number of Entertainment transactions: 6635
Total value of Entertainment transactions: $338950.10


In [36]:
pd_cust_and_credit = df_cust_and_credit.toPandas()
pd_cust_and_credit['CUST_STATE'].value_counts()

NY    4825
GA    3772
PA    3463
FL    3197
IL    2961
MI    2546
MD    2365
NJ    2294
CA    2209
OH    2018
NC    1957
VA    1859
MA    1746
TX    1562
WI    1549
SC    1358
MN    1145
MS    1085
KY     971
WA     923
IA     878
CT     693
IN     594
MT     286
AR     234
AL     204
Name: CUST_STATE, dtype: int64

In [61]:
state_name = input("Please choose a state from the following list (NY, GA, PA, FL, IL, MI, MD, NJ, CA, OH, \
 NC, VA, MA, TX, WI, SC, MN, MS, KY, WA, IA, CT, IN, MT, AR, AL): " )

def transactions_by_state(state_name):
    trans_by_state = spark.sql("SELECT BRANCH_CODE, COUNT(*) AS num_transactions, SUM(TRANSACTION_VALUE) AS total_value \
                               FROM bcr \
                               WHERE BRANCH_STATE = '{}' \
                               GROUP BY BRANCH_CODE".format(state_name))
    display(trans_by_state)
    
transactions_by_state(state_name)

Please choose a state from the following list (NY, GA, PA, FL, IL, MI, MD, NJ, CA, OH,  NC, VA, MA, TX, WI, SC, MN, MS, KY, WA, IA, CT, IN, MT, AR, AL): GA


DataFrame[BRANCH_CODE: bigint, num_transactions: bigint, total_value: double]

In [62]:
state_name = input("Please choose a state from the following list (NY, GA, PA, FL, IL, MI, MD, NJ, CA, OH, \
                   NC, VA, MA, TX, WI, SC, MN, MS, KY, WA, IA, CT, IN, MT, AR, AL): " )

def get_transaction_by_state(state_name):
    transactions = spark.sql("""
        SELECT TRANSACTION_VALUE
        FROM bcr
        WHERE BRANCH_STATE = '{}'
        GROUP BY BRANCH_CODE
    """.format(state_name))

    num_transactions = transactions.count()
    total_value = transactions.agg({"TRANSACTION_VALUE": "sum"}).collect()[0][0]

    print("Number of {} transactions: {}".format(state_name, num_transactions))
    print("Total value of {} transactions: ${:.2f}".format(state_name, total_value))
    
get_transaction_by_state(state_name)

Please choose a state from the following list (NY, GA, PA, FL, IL, MI, MD, NJ, CA, OH,                    NC, VA, MA, TX, WI, SC, MN, MS, KY, WA, IA, CT, IN, MT, AR, AL): AL


AnalysisException: expression 'bcr.`TRANSACTION_VALUE`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [BRANCH_CODE#30L], [TRANSACTION_VALUE#37]
+- Filter (BRANCH_STATE#11 = AL)
   +- SubqueryAlias bcr
      +- Project [BRANCH_CODE#30L, CUST_CC_NO#292, CUST_SSN#32L, DAY#33L, MONTH#34L, TRANSACTION_ID#35L, TRANSACTION_TYPE#36, TRANSACTION_VALUE#37, YEAR#38L, TIMEID#302, BRANCH_CITY#7, BRANCH_NAME#9, BRANCH_PHONE#131, BRANCH_STATE#11, BRANCH_STREET#12, BRANCH_ZIP#13L, LAST_UPDATED#14]
         +- Join FullOuter, (BRANCH_CODE#30L = BRANCHCODE#1549L)
            :- Project [BRANCH_CODE#30L, CUST_CC_NO#292, CUST_SSN#32L, DAY#33L, MONTH#34L, TRANSACTION_ID#35L, TRANSACTION_TYPE#36, TRANSACTION_VALUE#37, YEAR#38L, make_date(cast(YEAR#38L as int), cast(MONTH#34L as int), cast(DAY#33L as int)) AS TIMEID#302]
            :  +- Project [BRANCH_CODE#30L, CREDIT_CARD_NO#31 AS CUST_CC_NO#292, CUST_SSN#32L, DAY#33L, MONTH#34L, TRANSACTION_ID#35L, TRANSACTION_TYPE#36, TRANSACTION_VALUE#37, YEAR#38L]
            :     +- Relation[BRANCH_CODE#30L,CREDIT_CARD_NO#31,CUST_SSN#32L,DAY#33L,MONTH#34L,TRANSACTION_ID#35L,TRANSACTION_TYPE#36,TRANSACTION_VALUE#37,YEAR#38L] json
            +- Project [BRANCH_CITY#7, BRANCH_CODE#8L AS BRANCHCODE#1549L, BRANCH_NAME#9, BRANCH_PHONE#131, BRANCH_STATE#11, BRANCH_STREET#12, BRANCH_ZIP#13L, LAST_UPDATED#14]
               +- Project [BRANCH_CITY#7, BRANCH_CODE#8L, BRANCH_NAME#9, concat((, substring(BRANCH_PHONE#10, 0, 3), ), substring(BRANCH_PHONE#10, 4, 3), -, substring(BRANCH_PHONE#10, 7, 9)) AS BRANCH_PHONE#131, BRANCH_STATE#11, BRANCH_STREET#12, BRANCH_ZIP#13L, LAST_UPDATED#14]
                  +- Relation[BRANCH_CITY#7,BRANCH_CODE#8L,BRANCH_NAME#9,BRANCH_PHONE#10,BRANCH_STATE#11,BRANCH_STREET#12,BRANCH_ZIP#13L,LAST_UPDATED#14] json


In [76]:
state = input("Please choose a state from the following list (NY, GA, PA, FL, IL, MI, MD, NJ, CA, OH, \
  NC, VA, MA, TX, WI, SC, MN, MS, KY, WA, IA, CT, IN, MT, AR, AL): " )

def get_branch_transaction_info(state):
    # Filter transactions by branches in the given state
    transactions = spark.sql("""
        SELECT bcr.BRANCH_CODE, bcr.TRANSACTION_VALUE
        FROM bcr
        WHERE bcr.BRANCH_STATE = '{}'
    """.format(state))

    # Get number of transactions and total transaction value by branch code
    transactions_by_branch = transactions.groupby('BRANCH_CODE').agg(
        count('TRANSACTION_VALUE').alias('NUM_TRANSACTIONS'),
        round(sum('TRANSACTION_VALUE'),2).alias('TOTAL_VALUE')
    )

    # Print results
    print("Number of transactions by branch in {}: ")
    transactions_by_branch.show()
    
# Call get_branch_transaction_info() function with user input
get_branch_transaction_info(state)


Please choose a state from the following list (NY, GA, PA, FL, IL, MI, MD, NJ, CA, OH,   NC, VA, MA, TX, WI, SC, MN, MS, KY, WA, IA, CT, IN, MT, AR, AL): NY
Number of transactions by branch in {}: 
+-----------+----------------+-----------+
|BRANCH_CODE|NUM_TRANSACTIONS|TOTAL_VALUE|
+-----------+----------------+-----------+
|        178|             398|   18968.33|
|        160|             400|   20257.59|
|          3|             431|   21749.91|
|          8|             431|   21809.74|
|         66|             412|   21723.45|
|        175|             393|   20524.62|
|        135|             383|   20710.99|
|         46|             445|   23507.66|
|         93|             416|   21698.67|
|         16|             434|   22766.42|
+-----------+----------------+-----------+



In [79]:
state = input("Please choose a state from the following list (NY, GA, PA, FL, IL, MI, MD, NJ, CA, OH,\
NC, VA, MA, TX, WI, SC, MN, MS, KY, WA, IA, CT, IN, MT, AR, AL): ")

def get_branch_transaction_info(state):
    trans_by_branch_state = spark.sql(f"SELECT COUNT(bcr.BRANCH_STATE) as Branches_in_{state}, \
    ROUND(SUM(bcr.TRANSACTION_VALUE),2) as Total_Transactions_Value \
    FROM bcr \
    WHERE bcr.BRANCH_STATE = '{state}'")
    trans_by_branch_state.show()

get_branch_transaction_info(state)


Please choose a state from the following list (NY, GA, PA, FL, IL, MI, MD, NJ, CA, OH,NC, VA, MA, TX, WI, SC, MN, MS, KY, WA, IA, CT, IN, MT, AR, AL): MA
+--------------+------------------------+
|Branches_in_MA|Total_Transactions_Value|
+--------------+------------------------+
|           856|                43131.18|
+--------------+------------------------+



In [80]:
customer_ssn = input("Please enter your social security number: ")

def get_customer_information(customer_ssn):
    customer_info = spark.sql(f"SELECT * \
    FROM customers \
    WHERE customers.SSN = '{customer_ssn}'")
    customer_info.show()

get_customer_information(customer_ssn)


Please enter your social security number): 123459758
+----------------+---------+-------------+------------------+----------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+
|  Credit_card_no|CUST_CITY| CUST_COUNTRY|        CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|       LAST_UPDATED|MIDDLE_NAME|      SSN| FULL_STREET_ADDRESS|
+----------------+---------+-------------+------------------+----------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+
|4210653310195948|NewBerlin|United States|EHardy@example.com|   1243215|        WI|   53151|   Eugenio|    Hardy|2018-04-21 12:49:02|      trina|123459758|253,Country Club ...|
+----------------+---------+-------------+------------------+----------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+



##format
-- Create or replace view for `experienced_employee` with comments.
> CREATE OR REPLACE VIEW experienced_employee
    (id COMMENT 'Unique identification number', Name)
    COMMENT 'View for experienced employees'
    AS SELECT id, name
         FROM all_employee
        WHERE working_years > 5;

-- Create a temporary view `subscribed_movies`.
> CREATE TEMPORARY VIEW subscribed_movies
    AS SELECT mo.member_id, mb.full_name, mo.movie_title
         FROM movies AS mo
         INNER JOIN members AS mb
            ON mo.member_id = mb.id;

https://docs.databricks.com/sql/language-manual/sql-ref-syntax-ddl-create-view.html


### Question global v not global temp_view

Let’s create a temporary view on top of the DataFrame object by using df.createOrReplaceTempView(). Spark SQL temporary views are session-scoped and will not be available if the session that creates it terminates. The following examples create a Person table.


// Create Temporary View/Table 
df.createOrReplaceTempView("Person")-  temporary view that is not shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view using
df.createGlobalTempView('Person'-  temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view using




https://sparkbyexamples.com/spark/spark-createorreplacetempview-explained/

// Run SQL Query
spark.sql("select firstname, lastname from Person").show()

In [None]:
df_branch.createOrReplaceTempView("branch")
df_credit.createOrReplaceTempView("credit")
df_customers.createOrReplaceTempView("customers")

can also convert to DF and join df together.

then write functions

In [None]:
df_branch_pd = df_branch.toPandas()
df_credit_pd = df_credit.toPandas()
df_customers_pd = df_customers.toPandas()


1) Used to check the existing account details of a customer.
2) Used to modify the existing account details of a customer.
3) Used to generate a monthly bill for a credit card number for a given month and year.
4) Used to display the transactions made by a customer between two dates. Order by year, month, and day in descending order.


In [None]:
df_customers_pd.columns

In [None]:
df_credit_pd.columns

In [None]:
trans_df = pd.merge(df_credit_pd, df_customers_pd, how='inner', left_on = 'CUST_SSN', right_on = 'SSN')

Find and plot which transaction type has a high rate of transactions.

Note: Take a screenshot of the graphs. 
Find and plot which state has a high number of customers.

Note: Take a screenshot of the graphs. 
Find and plot the sum of all transactions for each customer, and which customer has the highest transaction amount.
hint(use CUST_SSN).

Note: Take a screenshot of the graphs. 



## 3

In [None]:
print(df_branch.show(5))
print(df_credit.show(5))
print(df_customers.show(5))

In [None]:
pd_credit = df_credit.toPandas()
pd_credit

In [None]:
#numbers are right but overwriting??
pd_credit.groupby(['TRANSACTION_TYPE']).count()

In [None]:
#df2 = df.groupby(['Courses'])['Courses'].count()
transaction_type = pd_credit.groupby(['TRANSACTION_TYPE'])['TRANSACTION_TYPE'].count()
transaction_type

In [None]:
type(transaction_names)

In [None]:
transaction_names = pd_credit.TRANSACTION_TYPE.value_counts()
transaction_names

In [None]:
transaction_names = pd_credit.TRANSACTION_TYPE.value_counts()
types_df = transaction_names.reset_index()
types_df.columns = ['categories', 'frequency']

In [None]:
fig, ax = plt.subplots(figsize=(10, 5))
ax = sns.barplot(x='categories', y='frequency', data=types_df, color='purple')
plt.title('Transaction Type')
plt.xlabel('Number of Transactions')
plt.ylabel('Transaction Type')
plt.ylim(6300, 7000)
ax.bar_label(ax.containers[0]);



In [None]:
pip show matplotlib

In [None]:
pd_customers = df_customers.toPandas()
pd_customers

In [None]:
states = pd_customers.groupby(['CUST_STATE'])['CUST_STATE'].count()
states

In [None]:
state_names= pd_customers.CUST_STATE.value_counts()
state_count = state_names.reset_index()
state_count.columns = ['states', 'frequency']
state_count

In [None]:
#limit 8
top_8_states = state_count.nlargest(8, ['frequency'])
top_8_states

In [None]:
top_8= state_count.sort_values(by=['frequency'], ascending=False).head(8)
top_8

In [None]:
fig, ax = plt.subplots(figsize=(10, 5))
ax = sns.barplot(x='states', y='frequency', data=top_8_states, color='green')
#ax.set_title('Transaction Type')
#ax.set_xlabel('Number of Transactions')
#ax.set_ylabel('Transaction Type')
#ax.set_ylim(6300, 7000)
plt.title('Top 8 States with the Highest Customers')
plt.xlabel('States')
plt.ylabel('Frequency')
plt.ylim(25, 125)
ax.bar_label(ax.containers[0]);

In [None]:
pd_credit

In [None]:
customer_transaction = pd_credit.groupby(['CUST_SSN'])['TRANSACTION_VALUE'].sum()
customer_transaction

In [None]:
individual_cust = customer_transaction.reset_index()
individual_cust.sort_values(by=['TRANSACTION_VALUE'], ascending=False, inplace=True)
individual_cust

In [None]:
top_1_customer = individual_cust.head(1)
top_1_customer

In [None]:
top_15_customers= individual_cust.nlargest(15, ['TRANSACTION_VALUE'])
top_15_customers

In [None]:

fig, ax = plt.subplots(figsize=(15, 10))
ax = sns.barplot(x='CUST_SSN', y='TRANSACTION_VALUE', data=top_15_customers, color='purple')
#ax.set_title('Transaction Type')
#ax.set_xlabel('Number of Transactions')
#ax.set_ylabel('Transaction Type')
#ax.set_ylim(6300, 7000)
plt.title('Top 15 Customers with the Highest Total Transaction Amounts')
plt.xlabel('Customer by SSN')
plt.ylabel('Total Transaction Amount')
plt.ylim(4700, 5900)
ax.set_xticklabels(ax.get_xticklabels(), rotation=45)
ax.bar_label(ax.containers[0]);

## 4

In [None]:
url = 'https://raw.githubusercontent.com/platformps/LoanDataset/main/loan_data.json'
response = requests.get(url)
data = response.json()
data

In [None]:
print(response.status_code)

In [None]:
spark = SparkSession.builder.master("local[*]").appName("Loan").getOrCreate()

In [None]:
type(data)

In [None]:
loan = spark.sparkContext.parallelize(data)
df_loan = spark.read.json(loan)
df_loan.show(5)

In [None]:
type(df_loan)


Create a Python program to GET (consume) data from the above API endpoint for the loan application dataset.
Find the status code of the above API endpoint.

Hint: status code could be 200, 400, 404, 401.
Once Python reads data from the API, utilize PySpark to load data into RDBMS(SQL). The table name should be CDW-SAPP_loan_application in the database.

Note: Use the “creditcard_capstone” database.



In [None]:
df_loan.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "creditcard_capstone.CDW-SAPP_loan_application ") \
  .option("user", "root") \
  .option("password", "pass1234") \
  .save()

## 5

In [None]:
pd_loan = df_loan.toPandas()
pd_loan.head()

In [None]:
pd_loan['Application_Status'] = pd_loan['Application_Status'].replace(['N', 'Y'], ['Rejected', 'Approved'])
pd_loan.head()

In [None]:
application_self_employed_approval = pd_loan.groupby(['Application_Status'])['Self_Employed'].count()
application_self_employed_approval

In [None]:
application_self_employed_approval1 = pd_loan.groupby(['Self_Employed'])['Application_Status'].count()
application_self_employed_approval1

In [None]:
fig_checking = pd_loan.groupby(['Application_Status', 'Self_Employed'])['Application_Status'].count()
fig_checking

In [None]:
pd_self_employed = pd_loan.loc[(pd_loan.Self_Employed == 'Yes')]
emp_yes = pd_self_employed.groupby(['Application_Status'])['Application_Status'].count()
emp_yes

In [None]:
pd_self_employed2 = pd_loan.loc[(pd_loan.Application_Status == 'Approved')]
emp_yes2 = pd_self_employed2.groupby(['Self_Employed'])['Self_Employed'].count()
emp_yes2

In [None]:
app_response = pd_self_employed.Application_Status.value_counts()
self_emp = app_response.reset_index()
self_emp.columns = ['Status', 'count']

In [None]:
app_response2 = pd_self_employed2.Self_Employed.value_counts()
self_emp2 = app_response2.reset_index()
self_emp2.columns = ['employed_type', 'count']
self_emp2['employed_type'] = self_emp2['employed_type'].replace(['No', 'Yes'], ['Employed_by_others', 'Self_employed'])

In [None]:
self_emp

In [None]:
self_emp2

In [None]:

plt.pie(self_emp['count'], labels=self_emp['Status'], autopct='%1.1f%%')
plt.title('Of all the self-employed, percentage approval')
plt.axis('equal')
plt.show();

In [None]:

plt.pie(self_emp2['count'], labels=self_emp2['employed_type'], autopct='%1.1f%%')
plt.title('Of all applications approved, percentage self-employed')
plt.axis('equal')
plt.show();



In [None]:
fig, ax = plt.subplots(figsize=(10, 15))
ax = sns.barplot(x='Status', y='count', data=self_emp, color='purple')
plt.title('Application Status by Self-Employed')
plt.xlabel('Application Status')
plt.ylabel('Number of Applications')
ax.bar_label(ax.containers[0]);

In [None]:
fig, ax = plt.subplots(figsize=(10, 15))
ax = sns.barplot(x='employed_type', y='count', data=self_emp2, color='purple')
plt.title('Application Status by Self-Employed')
plt.xlabel('Self Employed Status')
plt.ylabel('Number of Applications')
ax.bar_label(ax.containers[0]);

In [None]:
fig_checking2 = pd_loan.groupby(['Gender', 'Married', 'Application_Status'])['Application_Status'].count()
fig_checking2

In [None]:
pd_married_male = pd_loan.loc[(pd_loan.Gender == 'Male') & (pd_loan.Married == 'Yes')]
pd_married_men = pd_married_male.groupby(['Application_Status'])['Application_Status'].count()
pd_married_men


In [None]:
app_status_married_men = pd_married_male.Application_Status.value_counts()
married_men = app_status_married_men.reset_index()
married_men.columns = ['Status', 'count']

In [None]:
plt.pie(married_men['count'], labels=married_men['Status'], autopct='%1.1f%%')
plt.title('Married Men Application Approval')
plt.axis('equal')
plt.show();

In [None]:
pd_credit.columns

In [None]:
print(pd_credit.sort_values(by=['TIMEID'], ascending=False).head(2))
print(pd_credit.sort_values(by=['TIMEID'], ascending=True).head(2))

In [None]:
monthly_transactions = pd_credit.groupby(['MONTH'])['TRANSACTION_VALUE'].sum()
monthly_transactions

In [None]:
month_count = monthly_transactions.reset_index()
month_count.columns = ['month', 'total_transactions']
month_count

In [None]:
month_count.dtypes

In [None]:
month_count['month'] = month_count['month'].apply(str)
month_count['month'] = month_count['month'].replace(['1','2','3','4','5','6','7','8','9','10','11','12'], 
                          ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sept','Oct','Nov','Dec'])
#month_count.loc[:,'total_transactions'] ='$'+ month_count['total_transactions'].map('{:,.0f}'.format)
month_count

In [None]:
top_3_months = month_count.sort_values(by=['total_transactions'], ascending=False).head(3)
top_3_months

In [None]:
fig, ax = plt.subplots(figsize=(15, 10))
ax = sns.barplot(x='month', y='total_transactions', data=top_3_months, color='green')
plt.title('Top 3 Months with the Highest Total Transactions')
plt.xlabel('Months')
plt.ylabel('transaction amount')
plt.ylim(201000, 203000)
ax.bar_label(ax.containers[0]);

Find and plot which branch processed the highest total dollar value of healthcare transactions.

In [None]:
pd_credit.head(5)

In [None]:
pd_credit.info()

In [None]:
pd_branch = df_branch.toPandas()
pd_branch.head()

In [None]:
healthcare_trans_pd = pd_credit.loc[(pd_credit.TRANSACTION_TYPE == 'Healthcare')]
healthcare_by_branch = healthcare_trans_pd.groupby(['BRANCH_CODE'])['TRANSACTION_VALUE'].sum()
healthcare_by_branch

In [None]:
healthcare_total_df = healthcare_by_branch.reset_index()
healthcare_total_df

In [None]:
healthcare_tran_allcolumns_df = pd.merge(healthcare_total_df, pd_branch, on ='BRANCH_CODE')
healthcare_tran_allcolumns_df.head(5)

In [None]:
healthcare_tran_df = healthcare_tran_allcolumns_df[['BRANCH_CODE', 'TRANSACTION_VALUE', 'BRANCH_NAME']].copy()
healthcare_tran_df

In [None]:
pd_branch['BRANCH_NAME'].value_counts()

In [None]:
top_branch_for_healthcare_trans = healthcare_total_df.sort_values(by=['TRANSACTION_VALUE'], ascending=False).head(1)
top_branch_for_healthcare_trans

In [None]:
fig, ax = plt.subplots(figsize=(10, 5))
ax = sns.barplot(x='BRANCH_CODE', y='TRANSACTION_VALUE', data=top_branch_for_healthcare_trans, color='purple')
plt.title('Top Branch for Healthcare transactions')
plt.xlabel('Branch')
plt.ylabel('Amount')
plt.ylim(4300, 4400)
ax.bar_label(ax.containers[0]);

In [None]:
spark.stop()

Find and plot the percentage of applications approved for self-employed applicants.
Note: Take a screenshot of the graph. 
Find the percentage of rejection for married male applicants.
Note: Take a screenshot of the graph.
Find and plot the top three months with the largest transaction data.
Note: Take a screenshot of the graph. 
Find and plot which branch processed the highest total dollar value of healthcare transactions.
Note: Take a screenshot of the graph. 

