<a href="https://colab.research.google.com/github/nutellaweera/BD_Groupwork/blob/main/Group6_hadoopCurrencyConversion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Attempting to use Hadoop+PySpark for currency conversion of a large dataset. 
References: 
* Hadoop+Spark on colab - https://www.analyticsvidhya.com/blog/2021/05/integration-of-python-with-hadoop-and-spark/
* Python currency conversion - https://medium.com/analytics-vidhya/convert-currencies-automatically-with-python-python-in-audit-2-6c574dbae44
* PySpark with pandas - https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/

In [None]:
# Install java and setup env vars
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Download and unzip spark+hadoop
!wget -q https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

In [None]:
# Initialize findspark and set java vars
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

import findspark
findspark.init()

In [None]:
# Initialize a spark session
!pip install -q pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4055')\
        .getOrCreate()

In [None]:
# Read the dataset with spark
spark_df = spark.read.csv('wfpvam_foodprices.csv', header=True, inferSchema=True)

In [None]:
# Install forex convertor python package and currency conversion
!pip install forex_python
from forex_python.converter import CurrencyRates

In [None]:
# Pre-processing
from pyspark.sql import functions as F

# Add a formatted date col
spark_df = spark_df.withColumn('formatted_date', F.expr("make_date(`year recorded`, `month recorded`, 01)"))

# Get a list of supported currencies from the API and drop rows with any other currency types
c = CurrencyRates()
supported_currencies = list(c.get_rates('USD').keys())
spark_df = spark_df[spark_df['name of currency'].isin(supported_currencies)]

In [None]:
# Define a udf (user defined function to convert currency, filter out values with errors)
from pyspark.sql.types import FloatType
CONVERSION_ERROR = 9999.9999

def convertCurrency(currency, price, date):
  try:
      amt = round(c.convert(currency, 'USD', price, date), 4)
      return amt
  except:
    return CONVERSION_ERROR # to find error rows later

convert_currency = F.udf(lambda currency, price, date: convertCurrency(currency, price, date), FloatType())
spark_df = spark_df.withColumn('price_USD', convert_currency('name of currency', 'price paid', 'formatted_date'))

# Filter out rows with exceptions
spark_df = spark_df.filter(spark_df['price_USD'] != CONVERSION_ERROR)

In [None]:
spark_df.rdd.getNumPartitions()

2

In [None]:
spark_df.printSchema()

root
 |-- country id: double (nullable = true)
 |-- country name: string (nullable = true)
 |-- locality id: integer (nullable = true)
 |-- locality name: string (nullable = true)
 |-- market id: integer (nullable = true)
 |-- market name5: string (nullable = true)
 |-- commodity purchase id: integer (nullable = true)
 |-- commodity purchased: string (nullable = true)
 |-- currency id: double (nullable = true)
 |-- name of currency: string (nullable = true)
 |-- market type id: integer (nullable = true)
 |-- market name11: string (nullable = true)
 |-- measurement id: integer (nullable = true)
 |-- unit of goods measurement: string (nullable = true)
 |-- month recorded: integer (nullable = true)
 |-- year recorded: integer (nullable = true)
 |-- price paid: double (nullable = true)
 |-- mp_commoditysource: string (nullable = true)
 |-- formatted_date: date (nullable = true)
 |-- price_USD: float (nullable = true)



In [None]:
spark_df.write.csv('currency_converted.csv')

In [None]:
# Convert to pandas df before converting to csv to get everything on the same file (since there are 2 partitions)
import pandas as pd

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set ("spark.sql.execution.arrow.pyspark.fallback.enabled", "true")

pandas_df = spark_df.toPandas()
pandas_df.to_csv('foodprices_converted.csv')