In [1]:
import pandas as pd
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import seaborn as sns
import scipy as sp
from decimal import Decimal
from datetime import datetime
import re

%matplotlib inline

import findspark
findspark.init()

from pyspark.sql import *
from pyspark.sql.functions import  isnan, when, count, col,isnull

%matplotlib inline

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
sns.set_context('talk')
%matplotlib inline

sns.set_style("darkgrid", {"axes.facecolor": ".9"})
sns.set_context('notebook')

In [3]:
data_folder = './data/'

In [19]:
data = spark.read.csv(data_folder+'en.openfoodfacts.org.products.csv',header = True, sep="\t")

N_subset = 100

if N_subset>0:
    print("Subset with", N_subset, "products of the csv taken.")
    rdd = spark.sparkContext.parallelize(data.take(N_subset))
    data = spark.createDataFrame(rdd, data.schema)

Subset with 100 products of the csv taken.


In [20]:
data = data.na.drop(subset=['countries_en'])

## Number of products sold in each country

In [None]:
country_frequency = data.rdd.flatMap(lambda r: [(country, 1) for country in filter(None, r['countries_en'].split(','))]).reduceByKey(lambda a,b: a+b)

In [None]:
country_top10 = country_frequency.sortBy(lambda r: r[1], ascending = False).take(10)

In [None]:
country_top10_df = pd.DataFrame(country_top10, columns=['country', 'count']).set_index('country')

In [None]:
country_top10_df.plot(kind='barh', logx=True)
plt.title("Number of products sold by country.")
plt.show()

In [None]:
N_fr = country_top10_df.iloc[0]
N_us = country_top10_df.iloc[1]
N_all = sum([c[1] for c in country_frequency.countByValue().keys()])

print("sold in France: ", int(N_fr))
print("sold in the United States: ", int(N_us))
print("Total number of products: ", N_all)
print("Ratio: ", float((N_fr + N_us) / N_all))

In [None]:
###########################################################################################

In [None]:
data.write.mode('overwrite').parquet("data.parquet")

In [None]:
data = spark.read.parquet("data.parquet")

In [None]:
###########################################################################################

In [21]:
palm = data.select(['countries_en','ingredients_from_palm_oil_n','ingredients_that_may_be_from_palm_oil_n'])

In [22]:
palm = palm.na.drop()

In [24]:
palm_flat = palm.rdd.flatMap(lambda r:[(x, r['ingredients_from_palm_oil_n'], r['ingredients_that_may_be_from_palm_oil_n']) 
                                                for x in r['countries_en'].split(',')]).toDF()

In [25]:
palm_flat = palm_flat.selectExpr("_1 as countries_en", 
                                 "_2 as ingredients_from_palm_oil_n", 
                                 "_3 as ingredients_that_may_be_from_palm_oil_n")

In [26]:
palm_reduced = palm_flat.rdd.filter(lambda r: re.match(r'United State|France', r['countries_en'])).toDF()

In [27]:
palm_positive = palm_flat.filter("ingredients_from_palm_oil_n>0 OR ingredients_that_may_be_from_palm_oil_n>0")

In [30]:
palm_positive.groupBy("countries_en").count().collect()

[]