# Profile matching across two sources

## Objective

Please find attached two files (`source_f.csv` and `source_l.json`), your objective is to:

1.	analyze these two datasets, both of which are company information from two different real-world sources
2.	propose as many solutions/steps as possible that: for each row in `source_f.csv`, find a matching row from `source_l.json`
3.	output a final csv file with the following two columns: `_id` and `company_id` (`_id` is from `source_f.csv` and `company_id` from `source_l.json`)

## Sample output

```
_id,company_id
9d3ae4d9-396f-44b6-8d30-0007ab49f838,24748
9aa00d4d-8ac1-4eef-827a-0c9b4ff48108,15294012
5e02ae26-d3d5-46d2-9359-65a186a4f511,15315906
...
```

## Response

Please send **only your Jupyter Notebook** with code and documentation for this task **by email** to daoyuan.li@finquest.com, with the subject line **"Data Engineer Test: CANDIDATE-NAME"**.

Please also specify how many hours it took you to finish this task in your email.

**Please note that emails not confirming to the above format will be ignored.**

## Notes

* Work on this task independently
* Never share this notebook nor the datasets with anyone else, don't post this task on the Internet
* Be structured on the steps you take to solve this task


In [1]:
import pandas as pd
import json



In [2]:
df_f = pd.read_csv('/content/source_f.csv')

In [None]:
df_f.head()

In [None]:
df_f.info()

In [None]:
df_f.describe()

In [6]:
df_l = pd.read_json('/content/source_l.json')

In [None]:
df_l.head(5)

In [None]:
df_l.info() , df_f.info()

total no of records in file  - 8778

total no of records in json - 5000




In [None]:
""" trying to find out unique values in each columns and compare the two datasets based on columns """
print("count of unique ids present in csv and json \ncsv file -  {} \njson file - {} ".format(len(df_f['_id'].unique()) , len(df_l['company_id'].unique())))

In [None]:
print(df_f['Name'].unique() ,df_l['name'].unique())

In [None]:
print(len(df_f['Name'].unique()) ,len(df_l['name'].unique()))

In [None]:
print(len(df_f['Name'].unique()) ,len(df_l['name'].unique()))

In [None]:
""" checking  name match between two dataframe (by converting every name to lower case )  """
list_file=df_f['Name'].unique()
list_json = df_l['name'].str.lower().unique()
count=0
for i in range(len(df_f['Name'].unique())):
  if list_file[i].lower() in list_json:
    count+=1
    print(list_file[i])
print(count)

In [None]:
""" checking  name match between two dataframe (without converting  name to any case )  """
list_file=df_f['Name'].unique()
list_json = df_l['name'].unique()
count=0
for i in range(len(df_f['Name'].unique())):
  if list_file[i] in list_json:
    count+=1
    print(list_file[i])
print(count)

In [15]:

"""creating key columns in both the data frames for join operation , """
## the reason behind pulling  name and city from both the data frame is becoz these are not null columns
df_f['Name_key'] = df_f['Name'].str.lower()
df_l['Name_key'] = df_l['name'].str.lower()
df_f['City_key'] = df_f['City'].str.lower()
df_f['Country_key'] = df_f['Country'].str.lower()
df_l['City_key'] = df_l['headquarters'].str.lower().str.split(',').str[0].str.strip()

In [None]:
df_l

In [17]:
print(len(df_f['Name'].unique()) ,len(df_l['name'].unique()))

6984 5000


In [None]:

# merginng the data frames

s1 = pd.merge(df_f, df_l, how='inner', on=['Name_key','City_key'])
s1

In [None]:
len(s1['_id'].unique()) ,len(s1['company_id'].unique())
## as per my understanding the key relationship was one to many where one company_id can be mappped to many _id


In [20]:
s1.to_csv('output.csv',header=True,index=False,columns=['_id','company_id'])

In [None]:
from IPython.display import display

matched = {

    "9d3ae4d9-396f-44b6-8d30-0007ab49f838": 24748,
    "9aa00d4d-8ac1-4eef-827a-0c9b4ff48108": 15294012,
    "5e02ae26-d3d5-46d2-9359-65a186a4f511": 15315906
}

for _id, company_id in matched.items():
    display(df_f[df_f['_id'] == _id])
    display(df_l[df_l['company_id'] == company_id])
    print()

Now trying to perform same join operation using pyspark

In [22]:
###setting up spark in colab environment
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://mirrors.estointernet.in/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [23]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [24]:
import findspark
findspark.init()

In [25]:
findspark.find()

'/content/spark-3.1.2-bin-hadoop3.2'

In [26]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("Finquest_assignment").config('spark.ui.port', '4050').getOrCreate()

In [None]:
spark

In [28]:
df_f1 = spark.read.csv("source_f.csv",header=True, mode="DROPMALFORMED")

In [None]:
df_f1.show()
df_f1.count()

In [30]:
df_l1 = spark.read.option("multiline","true").json("source_l.json")

In [None]:
df_l1.show()
df_l1.count()

In [52]:
from pyspark.sql.functions import lower, col,split


In [60]:
df_f2=df_f1.withColumn("Name", lower(df_f1.Name))
df_l2 = df_l1.withColumn("name",lower(df_l1.name))
df_f2 = df_f2.withColumn("City",lower(df_f2.City))

df_l2  =df_l2.withColumn("headquarters",lower(split(df_l2.headquarters,',')[0]))

In [None]:
df_f2.show()

In [None]:
df_l2.show()

In [66]:
s2= df_f2.join(df_l2, [df_f2.Name == df_l2.name,df_f2.City == df_l2.headquarters], 'inner').select(df_f2._id, df_l2.company_id)

In [None]:
s2.show()

In [None]:
s2.count()

In [79]:
s2.write.options(header='True',delimiter =',').csv("/content/output_spark/")