# Task

Please analyze the Philippines market of cigarettes and build ML model that can forecast income from selling
cigarettes in the Philippines in 2020.

### Requirements

- You can use any publicly available datasets
- Use apache spark for data processing
- Result of work should be presented in clear form

# 1. Data Understanding

### 1.1. Short Data description

There are 4 CSV files for processing
- population-growth.csv - population growth in the Philippines 
- smoking-prevalence.csv - data where indicates the percentage of the population of the country who smoke cigarettes
- consumption-cigarettes-per_day.csv - here are data for cigarettes consumption per day per smoker for each country
- retail-price-for-a-pack-of-20-cigarette.xlsx - price for 1 pack of cigarettes for each country

### 1.2. The logic for the calculation income from cigarettes selling

I did not find the Data where indicates income per year from this calculation
We use the next formula for income per year:

Smokers (year) = Population (year) * Persevalence (year) - amount of smokers per year. Population (year) we get from population-growth.csv. Prevalence (year) gets from smoking-prevalence.csv
    
Income (daily) = Consumption (day) * Price / 20. Consumption (day) - indicates how many cigarettes smoke by one smoker per day. Consumption (day) we get from consumption-cigarettes-per_day.csv. Price - price for 1 pack of cigarettes (get cigarettes prices.csv). We have 20 cigarettes in one pack. 
    
Income (year) = Smokers (year) * Income (daily) * 365 - Income from selling cigarettes per year

# 2. Data Processing

Data Processing will be made by Apache Spark 

### 2.1. Set up PySpark for the data processing

In [1]:
# Import libraries
import findspark
findspark.init()
import pyspark

# Set up Context and Session
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
conf = SparkConf().setMaster("local").setAppName("CigarettesPhilippinesIncome")
sc = SparkContext(conf = conf)
spark = SparkSession \
    .builder \
    .appName("CigarettesPhilippinesIncome") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### 2.2. Read and merge data from files

##### 2.2.1.Population in Philippines

In [2]:
# Read data from csv file
population = spark.read.csv("Data_Files/population-growth.csv", header=True)
population.show(5)

+----+---------------+-----------+-------------+----------+-------------------+----------+
|Year|TotalPopulation|PercentMale|PercentFemale|PopDensity|TotalPopulationRank|GrowthRate|
+----+---------------+-----------+-------------+----------+-------------------+----------+
|2019|    108116.6150|     0.5024|       0.4976|    315.80|                 13|    0.0137|
|2018|    106651.3940|     0.5026|       0.4974|    311.52|                 13|    0.0141|
|2017|    105172.9250|     0.5028|       0.4972|    307.21|                 13|    0.0146|
|2016|    103663.8160|     0.5030|       0.4970|    302.80|                 12|    0.0152|
|2015|    102113.2120|     0.5033|       0.4967|    298.27|                 12|    0.0168|
+----+---------------+-----------+-------------+----------+-------------------+----------+
only showing top 5 rows



In [3]:
# We need only data in columns Year and TotalPopulation. Create DataFrame with needed columns
population = population.select(population['Year'], population['TotalPopulation'])
population.show(5)

+----+---------------+
|Year|TotalPopulation|
+----+---------------+
|2019|    108116.6150|
|2018|    106651.3940|
|2017|    105172.9250|
|2016|    103663.8160|
|2015|    102113.2120|
+----+---------------+
only showing top 5 rows



##### 2.2.2. Smoking Perseverance

In [4]:
# Read Data
perseverance = spark.read.csv("Data_Files/smoking-prevalence.csv", header=True)
perseverance.show(5)

+-------+----+----+--------------------------------------------------+
| Entity|Code|Year|Smoking prevalence, total (ages 15+) (% of adults)|
+-------+----+----+--------------------------------------------------+
|Albania| ALB|2000|                                              34.8|
|Albania| ALB|2005|                                              32.7|
|Albania| ALB|2010|                                              31.2|
|Albania| ALB|2011|                                              30.7|
|Albania| ALB|2012|                                              30.2|
+-------+----+----+--------------------------------------------------+
only showing top 5 rows



In [5]:
# Filter Data by the Philippines
persevelance = perseverance.select(perseverance['Year'], 
                                   perseverance['Smoking prevalence, total (ages 15+) (% of adults)'])\
.filter(perseverance['Entity'] == 'Philippines')
persevelance.show(5)

+----+--------------------------------------------------+
|Year|Smoking prevalence, total (ages 15+) (% of adults)|
+----+--------------------------------------------------+
|2000|                                              34.4|
|2005|                                              30.6|
|2010|                                              27.6|
|2011|                                                27|
|2012|                                              26.5|
+----+--------------------------------------------------+
only showing top 5 rows



##### 2.2.3. Merge population and perseverance sets

In [6]:
# Use outer join
smokers = population.join(persevelance, 'Year', "outer")
smokers.show(5)

+----+---------------+--------------------------------------------------+
|Year|TotalPopulation|Smoking prevalence, total (ages 15+) (% of adults)|
+----+---------------+--------------------------------------------------+
|2016|    103663.8160|                                              24.3|
|2012|           null|                                              26.5|
|2019|    108116.6150|                                              null|
|2017|    105172.9250|                                              null|
|2014|           null|                                              25.3|
+----+---------------+--------------------------------------------------+
only showing top 5 rows



We see that some values are NULL, but we will work with it below

In [7]:
# Change name for the column Smoking prevalence, total (ages 15+) (% of adults)
# Print Schema
smokers.printSchema()

root
 |-- Year: string (nullable = true)
 |-- TotalPopulation: string (nullable = true)
 |-- Smoking prevalence, total (ages 15+) (% of adults): string (nullable = true)



In [8]:
# Rename column
smokers = smokers.withColumnRenamed('Smoking prevalence, total (ages 15+) (% of adults)', 'Prevelence')
smokers.show(5)

+----+---------------+----------+
|Year|TotalPopulation|Prevelence|
+----+---------------+----------+
|2016|    103663.8160|      24.3|
|2012|           null|      26.5|
|2019|    108116.6150|      null|
|2017|    105172.9250|      null|
|2014|           null|      25.3|
+----+---------------+----------+
only showing top 5 rows



##### 2.2.4. Read data about cigarettes consumptions

In [9]:
# Read data from CSV 
consumptions = spark.read.csv('Data_Files/consumption-cigarettes-per_day.csv', header=True)
consumptions.show(5)

+-----------+----+----+-----------------------------------------------------+
|     Entity|Code|Year|Cigarette consumption per smoker per day (cigarettes)|
+-----------+----+----+-----------------------------------------------------+
|Afghanistan| AFG|1980|                                            5.6999998|
|Afghanistan| AFG|1981|                                            5.8000002|
|Afghanistan| AFG|1982|                                            5.8000002|
|Afghanistan| AFG|1983|                                            5.9000001|
|Afghanistan| AFG|1984|                                                    6|
+-----------+----+----+-----------------------------------------------------+
only showing top 5 rows



In [10]:
# Select only data ralated to Philippines 
consumptions = consumptions.select(consumptions['Year'], 
                                   consumptions['Cigarette consumption per smoker per day (cigarettes)']).\
filter(consumptions['Entity'] == 'Philippines')
consumptions.show(5)

+----+-----------------------------------------------------+
|Year|Cigarette consumption per smoker per day (cigarettes)|
+----+-----------------------------------------------------+
|1980|                                                 18.5|
|1981|                                                 18.4|
|1982|                                            18.200001|
|1983|                                                 18.1|
|1984|                                                   18|
+----+-----------------------------------------------------+
only showing top 5 rows



In [11]:
# Change column name for Cigarette consumption per smoker per day (cigarettes)
consumptions = consumptions.withColumnRenamed('Cigarette consumption per smoker per day (cigarettes)', 'Consumption')
consumptions.show(5)

+----+-----------+
|Year|Consumption|
+----+-----------+
|1980|       18.5|
|1981|       18.4|
|1982|  18.200001|
|1983|       18.1|
|1984|         18|
+----+-----------+
only showing top 5 rows



##### 2.2.5. Merge Consumption and main DataFrame

In [12]:
smokers = smokers.join(consumptions, 'Year', "outer")
smokers.show(5)

+----+---------------+----------+-----------+
|Year|TotalPopulation|Prevelence|Consumption|
+----+---------------+----------+-----------+
|1987|           null|      null|  17.799999|
|2016|    103663.8160|      24.3|       null|
|2012|           null|      26.5|       21.4|
|1988|           null|      null|  17.700001|
|2019|    108116.6150|      null|       null|
+----+---------------+----------+-----------+
only showing top 5 rows



##### 2.2.6. Read Cigarettes prices data

In [13]:
prices = spark.read.csv("Data_Files/cigarettes prices.csv", header=True)
prices.show(5)

+-----------+----+-------------------------------------------------+----------------------------------------------------------+--------------------------------------------------------------+-----------------------------------------------------------------------+-----------------------------+--------------------------------------+---------------------------------------------------+------------------------------------------------------------+
|    Country|Year|Most sold brand of cigarettes - currency reported|Most sold brand of cigarettes - price in currency reported|Most sold brand of cigarettes - price in international dollars|Most sold brand of cigarettes - price in US$ at official exchange rates|Average -  cigarette currency|Average -  cigarette price in currency|Average -  cigarette price in international dollars|Average -  cigarette price in US$ at official exchange rates|
+-----------+----+-------------------------------------------------+------------------------------------------

In [14]:
# Select needed columns for Philippines and rename them
prices = prices.select(prices['Year'], 
                       prices['Most sold brand of cigarettes - price in international dollars'] , 
                       prices['Average -  cigarette price in international dollars']).\
filter(prices['Country'] == 'Philippines')
prices = prices.withColumnRenamed("Most sold brand of cigarettes - price in international dollars", "MostSoldBrandPrice").\
withColumnRenamed("Average -  cigarette price in international dollars", "AveragePrice")
prices
prices.show(12)

+----+------------------+--------------+
|Year|MostSoldBrandPrice|  AveragePrice|
+----+------------------+--------------+
|2014|             01.03|          1.86|
|2012|              0.59|          1.23|
|2010|              0.57|Not applicable|
|2008|              0.52|Not applicable|
+----+------------------+--------------+



The main issue in this dataset is that we have only 2 values for the column AveragePrice. Data from column AveragePrice are the most relevant to calculate the income from all data in the file cigarettes prices.csv

Was found additional data for the Most Sold Brand price. We can define some correlation between MostSoldBrandPrice and AveragePrice

In [15]:
most_sold = spark.read.csv("Data_Files/Yearly-most-popular-price.csv", header=True)
most_sold.show()

+----+------------+
|Year|Price in USD|
+----+------------+
|2008|        0.73|
|2010|         0.8|
|2012|        0.84|
|2014|        1.47|
|2016|        2.26|
|2018|        3.14|
+----+------------+



In [16]:
# Merge DataFrames related to the price
prices = prices.join(most_sold, 'Year', "outer").orderBy('Year')
prices.show()

+----+------------------+--------------+------------+
|Year|MostSoldBrandPrice|  AveragePrice|Price in USD|
+----+------------------+--------------+------------+
|2008|              0.52|Not applicable|        0.73|
|2010|              0.57|Not applicable|         0.8|
|2012|              0.59|          1.23|        0.84|
|2014|             01.03|          1.86|        1.47|
|2016|              null|          null|        2.26|
|2018|              null|          null|        3.14|
+----+------------------+--------------+------------+



We see that data correlates. It means that if MostSoldBrandPrice price growth, than AveragePrice growth too. Those data will be needed to fill on Nan values for AveragePrice in the model 

In [17]:
# Create DataDrame that costist only Year and AveragePrice
# Create temporary view
prices.createGlobalTempView("prices")
av_price = spark.sql("SELECT Year, AveragePrice  FROM global_temp.prices")
av_price.show()

+----+--------------+
|Year|  AveragePrice|
+----+--------------+
|2008|Not applicable|
|2010|Not applicable|
|2012|          1.23|
|2014|          1.86|
|2016|          null|
|2018|          null|
+----+--------------+



##### 2.2.7. Final merge between main DataFrame and Average Price Dataframe

In [18]:
all_data = smokers.join(av_price, 'Year', "outer").orderBy('Year')

In [19]:
all_data.show(50)

+----+---------------+----------+-----------+--------------+
|Year|TotalPopulation|Prevelence|Consumption|  AveragePrice|
+----+---------------+----------+-----------+--------------+
|1950|     18580.4900|      null|       null|          null|
|1955|     22177.0580|      null|       null|          null|
|1960|     26269.7340|      null|       null|          null|
|1965|     30909.9880|      null|       null|          null|
|1970|     35803.5940|      null|       null|          null|
|1975|     41285.7420|      null|       null|          null|
|1980|     47357.7430|      null|       18.5|          null|
|1981|           null|      null|       18.4|          null|
|1982|           null|      null|  18.200001|          null|
|1983|           null|      null|       18.1|          null|
|1984|           null|      null|         18|          null|
|1985|     54275.8220|      null|         18|          null|
|1986|           null|      null|       17.9|          null|
|1987|           null|  

### 2.3. Data preparation for algorithms

There are lots of empty values. Those values should be interpolated. 
Also, data that tells about cigarette prices exist from the 2008 year.
It means that for the interpretation we can use all historical data, but as an input for the algorithm, we can use data from 2008 and later

Logit for interpolation is the next:
- We have the step in one year
- If the data has a linear relationship or similar to it, we define the step of growing / falling and apply it to all missed data. linear relationship can be used for TotalPopulation
- If values in some range do not changes, we use mean values with some. That rule applies for Prevalence and Consumption data
- If it's impossible to define relation because of a lack of data, we use correlated data. That method uses for AveragePrice

Applyind described above, we have the final dataset 

In [20]:
temp = Row("Year", "Population", "Consumption", "Prevelance", "AveragePrice")
temp1 = temp('2008', 90910000, 21.2, 26.4, 1.07)
temp2 = temp('2009', 92438000, 21.5, 27.0, 1.12)
temp3 = temp('2010', 93966000, 21.5, 27.6, 1.17)
temp4 = temp('2011', 95595000, 21.4, 27.0, 1.2)
temp5 = temp('2012', 97224000, 21.4, 26.5, 1.23)
temp6 = temp('2013', 98954000, 21.6, 25.9, 1.55)
temp7 = temp('2014', 100483000, 21.5, 25.3, 1.86)
temp8 = temp('2015', 102113000, 21.3, 24.7, 2.355)
temp9 = temp('2016', 103663816, 21.4, 24.3, 2.85)
temp10 = temp('2017', 105172925, 21.3, 23.9, 3.4)
temp11 = temp('2018', 106651394, 21.5, 23.4, 3.95)
schema = StructType([StructField("Year", StringType(), True),
                     StructField("Population", IntegerType(), True),
                    StructField("Consumption", FloatType(), True),
                     StructField("Persevelance", FloatType(), True),
                   StructField("AveragePrice", FloatType(), True)])
data = spark.createDataFrame([temp1, temp2, temp3, temp4, temp5, temp6, temp7, temp8, temp9, temp10, temp11], schema)

In [21]:
data.show()

+----+----------+-----------+------------+------------+
|Year|Population|Consumption|Persevelance|AveragePrice|
+----+----------+-----------+------------+------------+
|2008|  90910000|       21.2|        26.4|        1.07|
|2009|  92438000|       21.5|        27.0|        1.12|
|2010|  93966000|       21.5|        27.6|        1.17|
|2011|  95595000|       21.4|        27.0|         1.2|
|2012|  97224000|       21.4|        26.5|        1.23|
|2013|  98954000|       21.6|        25.9|        1.55|
|2014| 100483000|       21.5|        25.3|        1.86|
|2015| 102113000|       21.3|        24.7|       2.355|
|2016| 103663816|       21.4|        24.3|        2.85|
|2017| 105172925|       21.3|        23.9|         3.4|
|2018| 106651394|       21.5|        23.4|        3.95|
+----+----------+-----------+------------+------------+



In [22]:
# Save data to csv file
data.write.format("csv").save('Data_Files/CSV')

As an output, we have CSV file that can be used by ML algorithms for training models