In [1]:
# while True:
#     id=input('''What do you want to analyze?
#             1. Transactions made by customers in a given zipcode for a given month and year.
#             2. count and total values of given transaction type
#             3. count and total values of transactions for branches in given state
#           ''')
#     if id==1:
#         zipcode=input('Enter the zipcode of location you want to analyze: ')
#         month=input('Enter month number: ')
#         year=input('Enter year')




In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark=SparkSession.builder.master('local[1]').appName('Credit Card Management System').getOrCreate()

In [3]:
df_cust = spark.read\
  .format("jdbc")\
  .option("driver","com.mysql.cj.jdbc.Driver")\
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone")\
  .option("dbtable", "cdw_sapp_customer")\
  .option("user", "root")\
  .option("password", "admin")\
  .load()
df_cust.createOrReplaceTempView('df_cust')

In [4]:
df_branch = spark.read\
  .format("jdbc")\
  .option("driver","com.mysql.cj.jdbc.Driver")\
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone")\
  .option("dbtable", "cdw_sapp_branch")\
  .option("user", "root")\
  .option("password", "admin")\
  .load()
df_branch.createOrReplaceTempView('df_branch')

In [5]:
df_credit = spark.read\
  .format("jdbc")\
  .option("driver","com.mysql.cj.jdbc.Driver")\
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone")\
  .option("dbtable", "cdw_sapp_credit_card")\
  .option("user", "root")\
  .option("password", "admin")\
  .load()
df_credit.createOrReplaceTempView('df_credit')

In [6]:
df_cust.printSchema()

root
 |-- SSN: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [7]:
df_cust.select('last_updated').distinct().show()

+-------------------+
|       last_updated|
+-------------------+
|2018-04-21 12:49:02|
+-------------------+



In [8]:
df_branch.show(2)

+-----------+------------+-----------------+-----------+------------+----------+-------------+-------------------+
|BRANCH_CODE| BRANCH_NAME|    BRANCH_STREET|BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP| BRANCH_PHONE|       LAST_UPDATED|
+-----------+------------+-----------------+-----------+------------+----------+-------------+-------------------+
|          1|Example Bank|     Bridle Court|  Lakeville|          MN|     55044|(123)456-5276|2018-04-18 16:51:47|
|          2|Example Bank|Washington Street|    Huntley|          IL|     60142|(123)461-8993|2018-04-18 16:51:47|
+-----------+------------+-----------------+-----------+------------+----------+-------------+-------------------+
only showing top 2 rows



In [9]:
df_credit.show(2)

+--------------+----------------+-----------------+----------------+-----------+---------+----------+
|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|  CREDIT_CARD_NO|BRANCH_CODE| CUST_SSN|    TIMEID|
+--------------+----------------+-----------------+----------------+-----------+---------+----------+
|             1|       Education|             78.9|4210653349028689|        114|123459988|2018-02-14|
|             2|   Entertainment|            14.24|4210653349028689|         35|123459988|2018-03-20|
+--------------+----------------+-----------------+----------------+-----------+---------+----------+
only showing top 2 rows



In [10]:
df_credit.select(year('timeid'))

DataFrame[year(timeid): int]

## TRANSACTION DETAILS
1)    Used to display the transactions made by customers living in a given zip code for a given month and year. Order by day in descending order.

In [11]:
#and last_updated='2018-04-21 12:49:02' order by EXTRACT(DAY FROM last_updated) desc
query2="select * from df_credit join df_cust on df_cust.ssn=df_credit.cust_ssn where df_cust.cust_zip='11001' \
and month(timeid)=3 and year(timeid)= 2018 order by day(timeid) desc"
df_trans=spark.sql(query2)
df_trans.show()

+--------------+----------------+-----------------+----------------+-----------+---------+----------+---------+----------+-----------+----------+-------------------+----------+----------+-------------+--------+-------------+--------------------+-------------------+
|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|  CREDIT_CARD_NO|BRANCH_CODE| CUST_SSN|    TIMEID|      SSN|FIRST_NAME|MIDDLE_NAME| LAST_NAME|FULL_STREET_ADDRESS| CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|   CUST_PHONE|          CUST_EMAIL|       LAST_UPDATED|
+--------------+----------------+-----------------+----------------+-----------+---------+----------+---------+----------+-----------+----------+-------------------+----------+----------+-------------+--------+-------------+--------------------+-------------------+
|         17384|      Healthcare|            51.29|4210653395351413|         81|123456703|2018-03-27|123456703|     Selma|      elroy|        Le|4th Street West,801|FloralPark|        NY|United States| 

2)    Used to display the number and total values of transactions for a given type.
3)    Used to display the total number and total values of transactions for branches in a given state.

In [12]:
spark.sql("select count(*),round(sum(transaction_value),2) as total_value from df_credit where transaction_type='Education'").show()

+--------+-----------+
|count(1)|total_value|
+--------+-----------+
|    6638|  337980.07|
+--------+-----------+



In [13]:
query="select count(*),round(sum(transaction_value),2) as total_Value from df_credit join df_branch \
on df_credit.branch_code= df_branch.branch_code where branch_state='IL'"
spark.sql(query).show()

+--------+-----------+
|count(1)|total_Value|
+--------+-----------+
|    2439|   124708.3|
+--------+-----------+



## CUSTOMER DETAILS
1) Used to check the existing account details of a customer.
2) Used to modify the existing account details of a customer.
3) Used to generate a monthly bill for a credit card number for a given month and year.
4) Used to display the transactions made by a customer between two dates. Order by year, month, and day in descending order

In [14]:
query="select * from df_credit join df_cust on df_cust.ssn=df_credit.cust_ssn \
join df_branch on df_credit.branch_code=df_branch.branch_code\
 where df_cust.ssn=123456703"
spark.sql(query).show()

+--------------+----------------+-----------------+----------------+-----------+---------+----------+---------+----------+-----------+---------+-------------------+----------+----------+-------------+--------+-------------+---------------+-------------------+-----------+------------+----------------+------------+------------+----------+-------------+-------------------+
|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|  CREDIT_CARD_NO|BRANCH_CODE| CUST_SSN|    TIMEID|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|FULL_STREET_ADDRESS| CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|   CUST_PHONE|     CUST_EMAIL|       LAST_UPDATED|BRANCH_CODE| BRANCH_NAME|   BRANCH_STREET| BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP| BRANCH_PHONE|       LAST_UPDATED|
+--------------+----------------+-----------------+----------------+-----------+---------+----------+---------+----------+-----------+---------+-------------------+----------+----------+-------------+--------+-------------+---------------+---------------