In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 24 kB/s s eta 0:00:01
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.9 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612244 sha256=2846c4ad82a189f7b8c0d162caa5c1bb228d8bf4d9e999abed401861895671db
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
You should consider upgrading via the '/opt/conda/bin/python3.7 -m pip install --upgrade pip' command.[0m


In [3]:
from csv import reader
from pyspark.sql import Row 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import seaborn as sb
import matplotlib.pyplot as plt
import warnings
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col, isnan, when, count, trim, desc, sum, asc
from pyspark.sql.functions import countDistinct, explode, split, concat_ws, collect_list
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date

import os
os.environ["PYSPARK_PYTHON"] = "python3"

In [4]:
spark = SparkSession \
    .builder \
    .appName("antioxidants analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [23]:
dSchema = R([
            Fld("product",Str()),
            Fld("origin",Str()),
            Fld("procured_in",Str()),
            Fld("antioxi_mmol_100g", Dbl())
            ])

In [24]:
#read in tables
dffruits = spark.read.csv("../input/fruits/Fruit.csv", header=True, schema=dSchema)
dfvegetables = spark.read.csv("../input/vegetables/Vegetables.csv", header = True, schema=dSchema)
dfnuts = spark.read.csv("../input/nuts-and-seeds/Nuts and seeds.csv", header = True, schema=dSchema)

In [25]:
dffruits = dffruits.withColumn('category', lit('Fruit')).drop('origin')
dffruits.show(5, truncate = True)
dffruits.count()

+--------------------+-----------+-----------------+--------+
|             product|procured_in|antioxi_mmol_100g|category|
+--------------------+-----------+-----------------+--------+
|Apples, Composite...|        USA|             0.31|   Fruit|
|       Apples, dried|     Norway|             1.86|   Fruit|
|       Apples, dried|     Norway|             3.49|   Fruit|
|Apples, dried (Ta...|New Zealand|             6.07|   Fruit|
|        Apples, Fuji|        USA|             0.22|   Fruit|
+--------------------+-----------+-----------------+--------+
only showing top 5 rows



398

In [26]:
dfvegetables = dfvegetables.withColumn('category', lit('Vegetables')).drop('origin')
dfvegetables.show(5, truncate = True)
dfvegetables.count()

+--------------------+-----------+-----------------+----------+
|             product|procured_in|antioxi_mmol_100g|  category|
+--------------------+-----------+-----------------+----------+
|        Alfa sprouts|     Norway|             0.14|Vegetables|
|           Artichoke|     Norway|             0.69|Vegetables|
|   Artichoke, boiled|        USA|             3.89|Vegetables|
|   Artichoke, boiled|        USA|             4.54|Vegetables|
|Artichoke, brine ...|        USA|             3.36|Vegetables|
+--------------------+-----------+-----------------+----------+
only showing top 5 rows



303

In [27]:
dfnuts = dfnuts.withColumn('category', lit('Nuts')).drop('origin')
dfnuts.show(5, truncate = True)
dfnuts.count()

+--------------------+-----------+-----------------+--------+
|             product|procured_in|antioxi_mmol_100g|category|
+--------------------+-----------+-----------------+--------+
|Almonds, with pel...|     Norway|             0.23|    Nuts|
|Almonds, with pel...|     Norway|             0.37|    Nuts|
|Almonds, with pel...|     Norway|             0.28|    Nuts|
|Almonds, with pel...|        USA|             0.53|    Nuts|
|Almonds, with pel...|     Norway|             0.26|    Nuts|
+--------------------+-----------+-----------------+--------+
only showing top 5 rows



90

In [28]:
#union 3 tables
All = dffruits.unionAll(dfvegetables).unionAll(dfnuts)
All.show(5, truncate = True)
All.count()

+--------------------+-----------+-----------------+--------+
|             product|procured_in|antioxi_mmol_100g|category|
+--------------------+-----------+-----------------+--------+
|Apples, Composite...|        USA|             0.31|   Fruit|
|       Apples, dried|     Norway|             1.86|   Fruit|
|       Apples, dried|     Norway|             3.49|   Fruit|
|Apples, dried (Ta...|New Zealand|             6.07|   Fruit|
|        Apples, Fuji|        USA|             0.22|   Fruit|
+--------------------+-----------+-----------------+--------+
only showing top 5 rows



791

In [29]:
#simplify product name
All = All.withColumn('productArray', split(col("product"),",")).drop('product')
All = All.withColumn('product', col('productArray')[0]).drop('productArray')
All.show(5)

+-----------+-----------------+--------+-------+
|procured_in|antioxi_mmol_100g|category|product|
+-----------+-----------------+--------+-------+
|        USA|             0.31|   Fruit| Apples|
|     Norway|             1.86|   Fruit| Apples|
|     Norway|             3.49|   Fruit| Apples|
|New Zealand|             6.07|   Fruit| Apples|
|        USA|             0.22|   Fruit| Apples|
+-----------+-----------------+--------+-------+
only showing top 5 rows



In [30]:
anti_category = All.groupBy("category").agg(F.avg("antioxi_mmol_100g").alias('avg_antioxidant')).orderBy(desc('avg_antioxidant'))
anti_category.show(truncate = False)

+----------+------------------+
|category  |avg_antioxidant   |
+----------+------------------+
|Nuts      |4.574333333333334 |
|Fruit     |3.8280352644836255|
|Vegetables|0.8024752475247526|
+----------+------------------+



In [31]:
anti_product = All.groupBy("product").agg(F.avg("antioxi_mmol_100g").alias('avg_antioxidant')).orderBy(desc('avg_antioxidant'))
anti_product.show(5, truncate = False)

AnalysisException: cannot resolve '`product_name`' given input columns: [antioxi_mmol_100g, category, procured_in, product];;
'Aggregate ['product_name], ['product_name, avg(antioxi_mmol_100g#641) AS avg_antioxidant#913]
+- Project [procured_in#640, antioxi_mmol_100g#641, category#662, product#840]
   +- Project [procured_in#640, antioxi_mmol_100g#641, category#662, productArray#830, productArray#830[0] AS product#840]
      +- Project [procured_in#640, antioxi_mmol_100g#641, category#662, productArray#830]
         +- Project [product#638, procured_in#640, antioxi_mmol_100g#641, category#662, split(product#638, ,, -1) AS productArray#830]
            +- Union
               :- Project [product#638, procured_in#640, antioxi_mmol_100g#641, category#662]
               :  +- Project [product#638, origin#639, procured_in#640, antioxi_mmol_100g#641, Fruit AS category#662]
               :     +- Relation[product#638,origin#639,procured_in#640,antioxi_mmol_100g#641] csv
               :- Project [product#646, procured_in#648, antioxi_mmol_100g#649, category#701]
               :  +- Project [product#646, origin#647, procured_in#648, antioxi_mmol_100g#649, Vegetables AS category#701]
               :     +- Relation[product#646,origin#647,procured_in#648,antioxi_mmol_100g#649] csv
               +- Project [product#654, procured_in#656, antioxi_mmol_100g#657, category#740]
                  +- Project [product#654, origin#655, procured_in#656, antioxi_mmol_100g#657, Nuts AS category#740]
                     +- Relation[product#654,origin#655,procured_in#656,antioxi_mmol_100g#657] csv


In [None]:
#using Spark Sql to analyze the data
All.createOrReplaceTempView("All")     

In [None]:
#top 20 fruits high in antioxidants
spark.sql("""select distinct
             product_name,
             avg(antioxidant_content_mmol_100g) as avg_antioxidants
             from All
             where category = 'Fruit' and Procured_in = "USA"
             group by product_name
             order by avg_antioxidants desc
             limit 10
""").show(truncate = False)

In [None]:
#top 20 vegetables high in antioxidants
spark.sql("""select distinct
             product_name,
             avg(antioxidant_content_mmol_100g) as avg_antioxidants
             from All
             where category = 'Vegetables' and Procured_in = "USA"
             group by product_name
             order by avg_antioxidants desc
             limit 10
""").show(truncate = False)

In [None]:
#top 20 nuts high in antioxidants
spark.sql("""select distinct
             product_name,
             avg(antioxidant_content_mmol_100g) as avg_antioxidants
             from All
             where category = 'Nuts and Seeds' and Procured_in = "USA"
             group by product_name
             order by avg_antioxidants desc
             limit 10
""").show(truncate = False)

In [None]:
#import txt file with 25 selected food and their prices
dfprices = spark.read.option("header", "true") \
    .option("delimiter", "|") \
    .option("inferSchema", "true") \
    .csv("dbfs:/FileStore/tables/30_products_w_prices-1.txt")

dfprices.show(10, truncate=False)

In [None]:
dfprices.columns

In [None]:
#using Spark Sql to analyze the data
dfprices.createOrReplaceTempView("dfprices")     

In [None]:
spark.sql("""select distinct
             product,
             avg_antioxidant_mmol_100g as avg_antioxidants,
             price_per_pound as price_per_pound,
             avg_antioxidant_mmol_100g/0.22/price_per_pound as mmol_one_dollar
             from dfprices
             order by mmol_one_dollar desc
""").show(truncate = False)