In [1]:
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
import numpy as np
import pandas_gbq
import os

# Create spark session
spark = (SparkSession
    .builder 
    .appName("spark-cleansing") 
    .getOrCreate()
    )
sc = spark.sparkContext
sc.setLogLevel("WARN")


23/07/21 17:42:10 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.100.53 instead (on interface wlp3s0)
23/07/21 17:42:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/21 17:42:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
####################################
# path file
####################################
csv_file = "/home/azril/finproj/datasets/bank_marketing.csv"

####################################
# Read csv Data
####################################
#from csv to parquet


df = (
    spark.read
    .format("csv")
    .option("sep", ";")
    .option("header", True)
    .load(csv_file)
)

                                                                                

In [3]:
rows = df.count()
cols = len(df.columns)

print(f'Dimensions of Data: {(rows,cols)}')
print(f'Rows of Data: {rows}')
print(f'Columns of Data: {cols}')

Dimensions of Data: (41188, 21)
Rows of Data: 41188
Columns of Data: 21


In [4]:
df.show(5)

+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|age|      job|marital|  education|default|housing|loan|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp.var.rate|cons.price.idx|cons.conf.idx|euribor3m|nr.employed|  y|
+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
| 56|housemaid|married|   basic.4y|     no|     no|  no|telephone|  may|        mon|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
| 57| services|married|high.school|unknown|     no|  no|telephone|  may|        mon|     149|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
| 37| serv

In [5]:
df_transform1 = df.withColumn("education",
                                        when(df.education.endswith('4y'), regexp_replace(df.education, 'basic.4y', 'basic')) \
                                        .when(df.education.endswith('6y'), regexp_replace(df.education, 'basic.6y', 'basic')) \
                                         .when(df.education.endswith('9y'), regexp_replace(df.education, 'basic.9y', 'basic')) \
                                         .otherwise(df.education)
                                        )

In [6]:
df_transform1.show(5)

+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|age|      job|marital|  education|default|housing|loan|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp.var.rate|cons.price.idx|cons.conf.idx|euribor3m|nr.employed|  y|
+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
| 56|housemaid|married|      basic|     no|     no|  no|telephone|  may|        mon|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
| 57| services|married|high.school|unknown|     no|  no|telephone|  may|        mon|     149|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191| no|
| 37| serv

In [7]:
# 2. Some column name need to be standardized because Spark & BigQuery can't read it
df_transform2 = df_transform1.withColumnRenamed('emp.var.rate', 'emp_var_rate') \
       .withColumnRenamed('cons.price.idx', 'cons_price_idx') \
       .withColumnRenamed('cons.conf.idx', 'cons_conf_idx') \
       .withColumnRenamed('nr.employed', 'nr_employed') \
       .withColumnRenamed('default', 'credit') \
       .withColumnRenamed('y', 'subcribed')

In [8]:
df_transform2.show(5)

+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---------+
|age|      job|marital|  education| credit|housing|loan|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|subcribed|
+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---------+
| 56|housemaid|married|      basic|     no|     no|  no|telephone|  may|        mon|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191|       no|
| 57| services|married|high.school|unknown|     no|  no|telephone|  may|        mon|     149|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|   

In [9]:
df_transform3 = df_transform2.na.drop("any")

In [10]:
rows = df_transform3.count()
cols = len(df_transform3.columns)

print(f'Dimensions of Data: {(rows,cols)}')
print(f'Rows of Data: {rows}')
print(f'Columns of Data: {cols}')



Dimensions of Data: (41188, 21)
Rows of Data: 41188
Columns of Data: 21


                                                                                

In [11]:
df_transform4 = df_transform3.withColumn("client_id", monotonically_increasing_id())
df_transform4 = df_transform4.select(["client_id"] + [col for col in df_transform4.columns if col != "client_id"])
df_transform4.show(5)

+---------+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---------+
|client_id|age|      job|marital|  education| credit|housing|loan|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|subcribed|
+---------+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---------+
|        0| 56|housemaid|married|      basic|     no|     no|  no|telephone|  may|        mon|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191|       no|
|        1| 57| services|married|high.school|unknown|     no|  no|telephone|  may|        mon|     149|       1|  999|       0|nonexistent|     

In [16]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, DateType
import datetime

# Define UDFs to map the month and day_of_week columns
month_dict = {'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6, 
              'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12}

day_dict = {'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, 'fri': 5, 'sat': 6, 'sun': 7}  # We consider Monday as 1

udf_month = udf(lambda x: month_dict.get(x), IntegerType())
udf_day = udf(lambda x: day_dict.get(x), IntegerType())

df_transformed5 = df_transform4.withColumn("month", udf_month(col("month")))
df_transformed5 = df_transformed5.withColumn("day_of_week", udf_day(col("day_of_week")))

# Use a UDF to combine year, month, and day into a date column
def create_date(year, month, day_of_week):
    if month >= 5:
        year = 2021
    else:
        year = 2022

    return datetime.date(year, month, day_of_week)

udf_create_date = udf(create_date, DateType())

df_transformed5 = df_transformed5.withColumn('date', udf_create_date(F.lit(2021), 'month', 'day_of_week'))

# Show transformed data
df_transformed5.show()


+---------+---+-----------+--------+-------------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---------+----------+
|client_id|age|        job| marital|          education| credit|housing|loan|  contact|month|day_of_week|duration|campaign|pdays|previous|   poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|subcribed|      date|
+---------+---+-----------+--------+-------------------+-------+-------+----+---------+-----+-----------+--------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---------+----------+
|        0| 56|  housemaid| married|              basic|     no|     no|  no|telephone|    5|          1|     261|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|       5191|       no|2021-05-01|
|        1| 57|   services| married|        high.school|

                                                                                

In [13]:
df_transform5.show(5)

23/07/21 14:56:32 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_79327/3061106884.py", line 8, in calculate_date
  File "/home/azril/finproj/test/lib/python3.8/site-packages/pandas/core/generic.py", line 1466, in __nonzero__
    raise ValueError(
ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_79327/3061106884.py", line 8, in calculate_date
  File "/home/azril/finproj/test/lib/python3.8/site-packages/pandas/core/generic.py", line 1466, in __nonzero__
    raise ValueError(
ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().


In [14]:
df_transform3_gbq = df_transform3.toPandas()

                                                                                

In [15]:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/Archie/.google/credentials/google_credentials.json"

In [17]:
# TODO: Set project_id to your Google Cloud Platform project ID.
project_id = "data-fellowship-9-project"

# TODO: Set dataset_id to the full destination dataset ID.
table_id = 'final_project.raw_credit_card'

pandas_gbq.to_gbq(df_transform3_gbq, table_id, project_id=project_id)

100%|██████████| 1/1 [00:00<00:00, 18808.54it/s]
