In [77]:
from pyspark.sql import SparkSession
import os

In [80]:
#to get the current working directory
dir = os.getcwd()

In [2]:
spark = SparkSession.builder.appName('pipeline').getOrCreate()

In [81]:
clients = spark.read.csv(fr'{dir}\dataset_one.csv',header=True)
financials = spark.read.csv(fr'{dir}\dataset_two.csv',header=True)

In [15]:
filter_countries = ['United Kingdom', 'Netherlands']

In [37]:
renaming_columns = {
    'id':'client_identifier',
    'btc_a':'bitcoin_address',
    'cc_t':'credit_card_type'
    }

In [20]:
# this function takes two arguments, 1) the dataframe which needs to be filtered
# 2) a list with values on which needs to be filtered 
def filter_country(df,filter_countries):
    # in this step, a filter on the attribute "country" is applied
    # for each country that is passed on in the list, that country will
    # be in the final version of the dataframe
    df = df.filter(df['country'].isin(filter_countries))
    # the function returns the filtered dataframe
    return df

In [25]:
# this function takes two arguments, 1) the dataframe which needs to be modified
# 2) a single column name which needs to be dropped from the dataframe
def drop_column(df,column):
    # in this step, a column is dropped from the dataframe
    df = df.drop(column)
    # the function returns the modified dataframe
    return df

In [55]:
def join_dfs(df_a,df_b,on_column,join_type):
    df = df_a.join(df_b,[on_column], join_type)
    return df

In [40]:
def rename_column(df,a,b):
    df = df.withColumnRenamed(a,b)
    return df

In [21]:
clients = filter_country(clients,filter_countries)

In [27]:
clients = drop_column(clients,'email')

In [33]:
financials = drop_column(financials,'cc_n')

In [56]:
df = join_dfs(clients,financials,'id','inner')

In [57]:
for column in renaming_columns:
    df = rename_column(df,column,renaming_columns[column])

In [82]:
df.toPandas().to_csv(fr'{dir}\client_data\dataset.csv')

In [12]:
import logging
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime

In [13]:
def setup_logging():
    # Create a logger
    logger = logging.getLogger("DailyLogger")
    logger.setLevel(logging.INFO)  # Set logging level to INFO or DEBUG as per your need

    # Create a handler that writes log messages to a file, with a daily log rotation
    handler = TimedRotatingFileHandler('daily_log_{}.log'.format(datetime.now().strftime('%Y_%m_%d')), when="midnight", interval=1, backupCount=30)
    handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))

    # Add the handler to the logger
    logger.addHandler(handler)

    return logger

In [14]:
logger = setup_logging()

In [8]:
# Now, log messages using the logger
logger.info("This is an info message")

'2023_10_19'