<a href="https://colab.research.google.com/github/rickjhee/KommatiParaABN/blob/development/Exercise_ABN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## ABN Exercise for KommatiPara


In [2]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 68kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 16.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=7b32cc1e082b6ec7a8dd4e8875164213148e3575f7a10c13cc4e00b869d55c42
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [50]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

from pyspark.sql.functions import col
import logging
import os

logging.basicConfig(filename='/application.log', encoding='utf-8', level=logging.DEBUG)

In [51]:
# Download the datasets from the Github repository
logging.info('Reading datasets from Github')
!wget --no-check-certificate \
    https://raw.githubusercontent.com/rickjhee/KommatiParaABN/main/dataset_one.csv \
    -O /tmp/dataset_one.csv

!wget --no-check-certificate \
    https://raw.githubusercontent.com/rickjhee/KommatiParaABN/main/dataset_two.csv \
    -O /tmp/dataset_two.csv

--2021-05-18 09:30:30--  https://raw.githubusercontent.com/rickjhee/KommatiParaABN/main/dataset_one.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 52041 (51K) [text/plain]
Saving to: ‘/tmp/dataset_one.csv’


2021-05-18 09:30:30 (4.77 MB/s) - ‘/tmp/dataset_one.csv’ saved [52041/52041]

--2021-05-18 09:30:30--  https://raw.githubusercontent.com/rickjhee/KommatiParaABN/main/dataset_two.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.109.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64985 (63K) [text/plain]
Saving to: ‘/tmp/dataset_two.csv’


2021-05-18 09:30:30 (5.51 MB/s) 

In [53]:
def perform_data_operations(dataset_1, dataset_2):
  """
  Performs data operations based on several requirements 

  Inputs: dataset_1, dataset_2
  Outputs: joined dataset
  """
  # Requirement 2: Only use clients from the UK or NL
  dataset_1 = dataset_1.filter( (dataset_1.country  == "United Kingdom") | (dataset_1.country == "Netherlands") )

  # Requirement 3: Remove personal identifiable information from the first dataset, excluding emails.
  dataset_1 = dataset_1.drop('first_name', 'last_name')

  #Requirement 4: Remove credit card number from the second dataset.
  dataset_2 = dataset_2.drop('cc_n')

  # Requirement 5: Data should be joined using the id field.
  joined = dataset_1.join(dataset_2, on='id')

  # Requirement 6:	Rename the columns for the easier readability to the business users 
  rename_dict = {'id': 'client_identifier', 'btc_a': 'bitcoin_address', 'cc_t': 'credit_card_type'}

  joined = joined.select([col(c).alias(rename_dict.get(c, c)) for c in joined.columns]) 

  return joined

In [54]:
df1 = spark.read.csv("/tmp/dataset_one.csv", header=True)
df2 = spark.read.csv("/tmp/dataset_two.csv", header=True)
# Displays the content of the DataFrame to stdout

df1.show()
df2.show()

+---+----------+----------+--------------------+--------------+
| id|first_name| last_name|               email|       country|
+---+----------+----------+--------------------+--------------+
|  1|    Feliza|    Eusden|  feusden0@ameblo.jp|        France|
|  2| Priscilla|   Le Pine|plepine1@biglobe....|        France|
|  3|    Jaimie|    Sandes|jsandes2@reuters.com|        France|
|  4|      Nari|   Dolphin|ndolphin3@cbsloca...|        France|
|  5|     Garik|     Farre|gfarre4@economist...|        France|
|  6|   Kordula|   Broodes| kbroodes5@amazon.de|        France|
|  7|     Rakel|   Ingliby|    ringliby6@ft.com| United States|
|  8|      Derk| Mattielli|dmattielli7@slide...| United States|
|  9|    Karrah|   Boshard|   kboshard8@ihg.com|        France|
| 10| Friedrich|  Kreutzer|fkreutzer9@busine...|        France|
| 11|      Conn|   Claiden| cclaidena@vimeo.com|        France|
| 12|     Karel|   Crippin| kcrippinb@google.pl|        France|
| 13| Millisent|     Joint| mjointc@stat

In [57]:
joined = perform_data_operations(df1, df2)

joined.show()

+-----------------+--------------------+--------------+--------------------+--------------------+
|client_identifier|               email|       country|     bitcoin_address|    credit_card_type|
+-----------------+--------------------+--------------+--------------------+--------------------+
|               18|rdrinanh@odnoklas...|United Kingdom|1ErM8yuF3ytzzxLy1...|      china-unionpay|
|               32|wbamfordv@t-onlin...|United Kingdom|12sxmYnPcADAXw1Yk...|             maestro|
|               33|swestallw@blinkli...|United Kingdom|1GZ7QB7GUFSWnkBHm...|          mastercard|
|               34|erosengrenx@usato...|United Kingdom|12o8zrHx6snCPbtko...|       visa-electron|
|               36|dbuckthorpz@tmall...|   Netherlands|15X53Z9B9jUNrvFpb...|diners-club-inter...|
|               62|  bbarham1p@wisc.edu|   Netherlands|16qpYVt6YAAx4JYjz...|                 jcb|
|               67|lbeavors1u@techno...|United Kingdom|12ya1ED93ApPBQRSC...|            bankcard|
|               70|f

In [60]:
#Requirement 8: Save the output in a client_data directory in the root directory of the project.
os.makedirs('/tmp/client_data', exist_ok=True)
logging.info('Saving joined dataset')
joined.toPandas().to_csv('/tmp/client_data/client_data.csv')