Initialization for Google Colab

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install -q pyspark
import os
import pyspark
import findspark
import numpy as np
import pandas as pd
from datetime import date,timedelta,datetime
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from google.colab import drive
from pyspark.sql.types import *
from pyspark.sql.window import Window
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
findspark.init()
findspark.find()
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

Mounting Google Drive

In [None]:
drive.mount('/content/gdrive/',force_remount=True)
root_path="/content/gdrive/MyDrive/Project/Big_Data_and_IoT_project/"

Reading the provided .csv file

In [None]:
dataset=spark.read.load(root_path+"dataset.csv",format="csv",sep=",",inferScheme=True,header=True)
dataset.show()

Checking the provided data

In [None]:
dataset.summary().show()

Renaming the columns for further access

In [None]:
for i in dataset.schema.names:
    S=i.replace(' ','_')
    S=S.replace('.','_')
    dataset=dataset.withColumnRenamed(i,S)
dataset=dataset.na.drop()

Checking existing schema

In [None]:
for i in dataset.schema:
    print(i)
print()
dataset.printSchema()


Additional imports for data processing

In [None]:
!pip install -q random-address
import string
import random
import random_address
import math
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from random import randrange
from datetime import timedelta
from datetime import datetime

Helper methods for generating certain data

In [None]:
def getRandomAddress():
    ret=random_address.real_random_address()
    S=""
    T=""
    f=0
    for i in ret.values():
        try:
            T+=i
            if len(i)>0:
                if f==1:
                    S+=", "
                else:
                    f=1
                S+=str(i)
        except:
            break
    return S

def getRandomLatitude():
    ret=random_address.real_random_address_by_state('CT')
    return ret['coordinates']['lat']

def getRandomLongitude():
    ret=random_address.real_random_address_by_state('CT')
    return ret['coordinates']['lng']

def random_date(start,end):
    delta=end-start
    int_delta=(delta.days*24*60*60)+delta.seconds
    random_second=randrange(int_delta)
    return start+timedelta(seconds=random_second)

def DoB():
    d1 = datetime.strptime('12/16/1971 12:00 AM', '%m/%d/%Y %I:%M %p')
    d2 = datetime.strptime('12/31/1990 11:59 PM', '%m/%d/%Y %I:%M %p')
    return random_date(d1,d2)

def DoC():
    d1 = datetime.strptime('12/16/2020 12:00 AM', '%m/%d/%Y %I:%M %p')
    d2 = datetime.strptime('12/31/2021 11:59 PM', '%m/%d/%Y %I:%M %p')
    return random_date(d1,d2)

def DoU(s):
    t=s.split('-')
    s=t[1]+'/'+t[2]+'/'+t[0]
    d1 = datetime.strptime(s+' 12:00 AM', '%m/%d/%Y %I:%M %p')
    d2 = datetime.strptime('04/09/2022 11:59 PM', '%m/%d/%Y %I:%M %p')
    return random_date(d1,d2)

def DoV(s):
    t=s.split('-')
    s=t[1]+'/'+t[2]+'/'+t[0][:-1]+'4'
    d1 = datetime.strptime(s+' 12:00 AM', '%m/%d/%Y %I:%M %p')
    d2 = datetime.strptime('04/09/2026 11:59 PM', '%m/%d/%Y %I:%M %p')
    return random_date(d1,d2)

def DoL(s):
    t=s.split('-')
    s=t[1]+'/'+t[2]+'/'+t[0]
    d1 = datetime.strptime(s+' 12:00 AM', '%m/%d/%Y %I:%M %p')
    d2 = datetime.strptime('04/09/2022 11:59 PM', '%m/%d/%Y %I:%M %p')
    return random_date(d1,d2)

def getActTemp(st):
    if int(st)>2024:
        return '1'
    else:
        return '0'

Lambda functions for generating data

In [None]:
genUserID=udf(lambda :''.join(random.choices(string.ascii_uppercase+string.digits,k=12)),StringType())
genDGID=udf(lambda :random.randint(1,50),IntegerType())
genLimit=udf(lambda :random.randint(10,25),IntegerType())
genDue=udf(lambda :random.uniform(100.0,2500.0),DoubleType())
genTag=udf(lambda :''.join(random.choices(string.digits,k=6)),StringType())
genRefN=udf(lambda :''.join(random.choices(string.digits,k=random.randint(1,8))),StringType())
genQty=udf(lambda :random.randint(123,1234),IntegerType())
genPTID=udf(lambda :random.randint(1,25),IntegerType())
genFirstName=udf(lambda x:' '.join(x.split()[0:-1]),StringType())
genLastName=udf(lambda x:x.split()[-1],StringType())
genAdd=udf(lambda :getRandomAddress(),StringType())
genLat=udf(lambda :getRandomLatitude(),DoubleType())
genLon=udf(lambda :getRandomLongitude(),DoubleType())
genCity=udf(lambda x:x.split()[1],StringType())
genPostCode=udf(lambda x:x.split()[3],StringType())
genCountry=udf(lambda x:x.split()[2],StringType())
genDoB=udf(lambda :str(DoB()).split()[0],StringType())
genDoC=udf(lambda :str(DoC()).split()[0],StringType())
genDoU=udf(lambda x:str(DoU(x)).split()[0],StringType())
genDoV=udf(lambda x:str(DoV(x)).split()[0],StringType())
genDoL=udf(lambda x:str(DoL(x)).split()[0],StringType())
genTF=udf(lambda :''.join(random.choices(['t','f'],k=1)),StringType())
genActiv=udf(lambda :''.join(random.choices(['0','1'],k=1)),StringType())
genActiTemp=udf(lambda x:getActTemp(x.split('-')[0]),StringType())
tint=udf(lambda x:int(x),IntegerType())

Generating extra data and adding to the provided data

In [None]:
dataset=dataset.withColumnRenamed("Contact","contact")
dataset=dataset.withColumn("contact_number",dataset.contact)
dataset=dataset.withColumn("user_id",genUserID())
dataset=dataset.withColumn("first_name",genFirstName(dataset.Employee_Name))
dataset=dataset.withColumn("last_name",genLastName(dataset.Employee_Name))
dataset=dataset.withColumn("date_of_birth",genDoB())
dataset=dataset.withColumn("lat",genLat())
dataset=dataset.withColumn("lon",genLon())
dataset=dataset.withColumn("is_otp_verified",genTF())
dataset=dataset.withColumn("discount_group_id",genDGID())
dataset=dataset.withColumn("address",genAdd())
dataset=dataset.withColumn("city",genCity(dataset.address))
dataset=dataset.withColumn("postcode",genPostCode(dataset.address))
dataset=dataset.withColumn("country",genCountry(dataset.address))
dataset=dataset.withColumn("currency",lit("USD"))
dataset=dataset.withColumn("tag",genTag())
dataset=dataset.withColumn("created_date",genDoC())
dataset=dataset.withColumn("updated_date",genDoU(dataset.created_date))
dataset=dataset.withColumn("valid_until",genDoV(dataset.created_date))
dataset=dataset.withColumn("activated",genActiTemp(dataset.valid_until))
dataset=dataset.withColumn("last_used",genDoL(dataset.updated_date))
dataset=dataset.withColumn("expire_date",dataset.valid_until)
dataset=dataset.withColumn("reference_number",genRefN())
dataset=dataset.withColumn("employee_code",tint(dataset.Emp_ID))
dataset=dataset.withColumn("unique_user_id",tint(dataset.SL_No_))
dataset=dataset.withColumn("id",tint(dataset.SL_No_))
dataset=dataset.withColumn("quantity",genQty())
dataset=dataset.withColumn("product_type_id",lit(int(input("Enter Product Type ID : "))))
dataset=dataset.withColumn("rfid_id",dataset.RFID_Card_Number)
dataset=dataset.withColumn("credit_type",lit("NORMAL"))
dataset=dataset.withColumn("per_day",genLimit())
dataset=dataset.withColumn("per_week",dataset.per_day*7)
dataset=dataset.withColumn("per_month",dataset.per_day*30)
dataset=dataset.withColumn("inventory_type_id",lit(int(input("Enter Inventory Type ID : "))))
dataset=dataset.withColumn("fixed_credit_limit",lit(50))
dataset=dataset.withColumn("credit_limit",lit(30))
dataset=dataset.withColumn("due_amount",genDue())
dataset=dataset.withColumn("agent_name_id",lit(int(input("Enter Agent Name ID : "))))
dataset.show()
dataset.printSchema()

Creating Schema based on the table definitions

In [None]:
schema_users_personalinfo=StructType([
    StructField("user_id",StringType(),True),
    StructField("first_name",StringType(),True),
    StructField("last_name",StringType(),True),
    StructField("date_of_birth",StringType(),True),
    StructField("contact_number",StringType(),True),
    StructField("address",StringType(),True),
    StructField("lat",DoubleType(),True),
    StructField("lon",DoubleType(),True),
    StructField("created_date",StringType(),True),
    StructField("updated_date",StringType(),True),
    StructField("is_otp_verified",StringType(),True),
    StructField("discount_group_id",IntegerType(),True),
])

In [None]:
schema_users_billinginfo=StructType([
    StructField("user_id",StringType(),True),
    StructField("address",StringType(),True),
    StructField("city",StringType(),True),
    StructField("postcode",IntegerType(),True),
    StructField("country",StringType(),True),
    StructField("contact",StringType(),True),
    StructField("currency",StringType(),True),
    StructField("created_date",StringType(),True),
    StructField("updated_date",StringType(),True),
])

In [None]:
schema_users_rfidinfo=StructType([
    StructField("user_id",StringType(),True),
    StructField("tag",StringType(),True),
    StructField("activated",StringType(),True),
    StructField("created_date",StringType(),True),
    StructField("updated_date",StringType(),True),
    StructField("valid_until",StringType(),True),
    StructField("last_used",StringType(),True),
    StructField("reference_number",StringType(),True),
    StructField("employee_code",IntegerType(),True),
    StructField("unique_user_id",IntegerType(),True),
])

In [None]:
schema_users_rfiditem=StructType([
    StructField("id",IntegerType(),True),
    StructField("quantity",IntegerType(),True),
    StructField("created_date",StringType(),True),
    StructField("updated_date",StringType(),True),
    StructField("product_type_id",IntegerType(),True),
    StructField("rfid_id",StringType(),True),
    StructField("credit_type",StringType(),True),
    StructField("expire_date",StringType(),True),
])

In [None]:
schema_users_productlimit=StructType([
    StructField("id",IntegerType(),True),
    StructField("per_day",IntegerType(),True),
    StructField("per_week",IntegerType(),True),
    StructField("per_month",IntegerType(),True),
    StructField("created_date",StringType(),True),
    StructField("updated_date",StringType(),True),
    StructField("inventory_type_id",IntegerType(),True),
    StructField("user_id",StringType(),True),
])

In [None]:
schema_users_postpaidrfid=StructType([
    StructField("id",IntegerType(),True),
    StructField("fixed_credit_limit",IntegerType(),True),
    StructField("credit_limit",IntegerType(),True),
    StructField("due_amount",DoubleType(),True),
    StructField("created_date",StringType(),True),
    StructField("updated_date",StringType(),True),
    StructField("agent_name_id",IntegerType(),True),
    StructField("rfid_id",StringType(),True),
])

In [None]:
schema_users_checkout_limit=StructType([
    StructField("user_id",StringType(),True),
    StructField("per_day",IntegerType(),True),
    StructField("per_week",IntegerType(),True),
    StructField("per_month",IntegerType(),True),
    StructField("created_date",StringType(),True),
    StructField("updated_date",StringType(),True),
])

Functions to create, save and show tables

In [None]:
def saveCSV_createTable(cur_schema,name):
    table_select=[]
    table_drop=[]
    for i in cur_schema.names:
        table_select.append(i)
    for i in dataset.schema.names:
        if i not in table_select:
            table_drop.append(i)
    dataset.drop(*[i for i in table_drop]).select(*[i for i in table_select]).toPandas().to_csv(root_path+name+".csv",header=True,index=False)
    table=spark.read.format("csv").option("header",True).schema(cur_schema).load(root_path+name+".csv")
    return table

def showTable(cur,name):
    print(name)
    cur.show()
    cur.printSchema()


Creating and saving tables based on the given table definitions using created schema

In [None]:
users_personalinfo=saveCSV_createTable(schema_users_personalinfo,"users_personalinfo")
users_billinginfo=saveCSV_createTable(schema_users_billinginfo,"users_billinginfo")
users_rfidinfo=saveCSV_createTable(schema_users_rfidinfo,"users_rfidinfo")
users_rfiditem=saveCSV_createTable(schema_users_rfiditem,"users_rfiditem")
users_productlimit=saveCSV_createTable(schema_users_productlimit,"users_productlimit")
users_postpaidrfid=saveCSV_createTable(schema_users_postpaidrfid,"users_postpaidrfid")
users_checkout_limit=saveCSV_createTable(schema_users_checkout_limit,"users_checkout_limit")

The created tables

In [None]:
showTable(users_personalinfo,"users_personalinfo")
showTable(users_billinginfo,"users_billinginfo")
showTable(users_rfidinfo,"users_rfidinfo")
showTable(users_rfiditem,"users_rfiditem")
showTable(users_productlimit,"users_productlimit")
showTable(users_postpaidrfid,"users_postpaidrfid")
showTable(users_checkout_limit,"users_checkout_limit")

Modules used

In [None]:
import types
def imports():
    for name, val in globals().items():
        if isinstance(val, types.ModuleType):
            yield val.__name__
list(imports())