In [None]:
# Installing spark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=091194425ed5a0dd429ffcb70c8aab820b975e1cacca6dcaaa210bae3d703864
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
# Importing the dataset
!wget https://jpbarddal.github.io/assets/data/bigdata/transactions_amostra.csv.zip
!unzip transactions_amostra.csv.zip

--2023-06-05 16:38:36--  https://jpbarddal.github.io/assets/data/bigdata/transactions_amostra.csv.zip
Resolving jpbarddal.github.io (jpbarddal.github.io)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jpbarddal.github.io (jpbarddal.github.io)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 47513871 (45M) [application/zip]
Saving to: ‘transactions_amostra.csv.zip’


2023-06-05 16:38:39 (147 MB/s) - ‘transactions_amostra.csv.zip’ saved [47513871/47513871]

Archive:  transactions_amostra.csv.zip
  inflating: transactions_amostra.csv  
  inflating: __MACOSX/._transactions_amostra.csv  


In [None]:
# Criando a sessao
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, min, avg, count, concat_ws, lit, sum

spark = SparkSession.builder\
                    .master('local[*]')\
                    .appName('theo').getOrCreate()
sc = spark.sparkContext

In [None]:
# Creating the dataframe loading the csv file
df = spark.read.csv('transactions_amostra.csv', 
                    sep=';', 
                    header=True, 
                    inferSchema=True)

In [None]:
df.printSchema()

# VIEW (limitação pois não fazemos update!)
df.createOrReplaceTempView('tabela')


root
 |-- country_or_area: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- comm_code: string (nullable = true)
 |-- commodity: string (nullable = true)
 |-- flow: string (nullable = true)
 |-- trade_usd: long (nullable = true)
 |-- weight_kg: double (nullable = true)
 |-- quantity_name: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- category: string (nullable = true)



In [None]:
df.show()

+--------------------+----+---------+--------------------+---------+---------+------------+-------------------+------------+--------------------+
|     country_or_area|year|comm_code|           commodity|     flow|trade_usd|   weight_kg|      quantity_name|    quantity|            category|
+--------------------+----+---------+--------------------+---------+---------+------------+-------------------+------------+--------------------+
|             Belgium|2016|   920510|Brass-wind instru...|   Export|   571297|      3966.0|    Number of items|      4135.0|92_musical_instru...|
|           Guatemala|2008|   660200|Walking-sticks, s...|   Export|    35022|      5575.0|    Number of items|     10089.0|66_umbrellas_walk...|
|            Barbados|2006|   220210|Beverage waters, ...|Re-Export|    81058|     44458.0|   Volume in litres|     24113.0|22_beverages_spir...|
|             Tunisia|2016|   780411|Lead foil of a th...|   Import|     4658|       121.0|Weight in kilograms|       121.0|

##**1:** **The number of transactions involving Brazil.**

**Dataframe Solution**

In [None]:
# Picking only the transactions from Brazil and counting the number
df.filter(col('country_or_area') == 'Brazil').count()

27463

**SQL Query solution**

In [None]:
# Selecting the country Brazil to calculate number of transactions made
query = '''
        SELECT country_or_area AS pais, COUNT(country_or_area) AS count
        FROM tabela
        WHERE country_or_area = "Brazil"
        GROUP BY country_or_area; 
        '''
spark.sql(query).show(10)

+------+-----+
|  pais|count|
+------+-----+
|Brazil|27463|
+------+-----+



##**2: The number of transactions per flow type and year.**

**Dataframe Solution**

In [None]:
# Grouping by year and flow to count the number of transactions
df.groupBy(col('year'), col('flow')).count().show(10)

+----+---------+-----+
|year|     flow|count|
+----+---------+-----+
|2011|   Import|31301|
|2006|   Import|32301|
|1994|Re-Export| 1079|
|2009|   Export|17825|
|1998|   Import|24881|
|2012|   Export|17863|
|1999|Re-Export| 2046|
|2006|   Export|18585|
|2015|Re-Import| 1479|
|2014|Re-Import| 1457|
+----+---------+-----+
only showing top 10 rows



**SQL Query solution**

In [None]:
# Selecting and grouping by year and flow to count the number of transactions made
query = '''
        SELECT year, flow, COUNT(year, flow)
        FROM tabela
        GROUP BY year, flow; 
        '''
spark.sql(query).show(10)

+----+---------+-----------------+
|year|     flow|count(year, flow)|
+----+---------+-----------------+
|2011|   Import|            31301|
|2006|   Import|            32301|
|1994|Re-Export|             1079|
|2009|   Export|            17825|
|1998|   Import|            24881|
|2012|   Export|            17863|
|1999|Re-Export|             2046|
|2006|   Export|            18585|
|2015|Re-Import|             1479|
|2014|Re-Import|             1457|
+----+---------+-----------------+
only showing top 10 rows



##**3: The average of commodity values per year.**

**Dataframe Solution**

In [None]:
# Grouping by year to calculate the average commodity price
df.groupBy(col('year'))\
  .agg( 
       avg(col('trade_usd')).alias('MEDIA'))\
  .orderBy(col('year').asc()).show(10)

+----+--------------------+
|year|               MEDIA|
+----+--------------------+
|1988| 1.864297055638571E7|
|1989|1.1263871329920229E7|
|1990| 1.172426586778952E7|
|1991| 1.306922385515173E7|
|1992|   9402960.863025468|
|1993|1.0353959855309162E7|
|1994|1.1350325049077941E7|
|1995|1.2286454103356835E7|
|1996|1.1945524161286663E7|
|1997|   9549881.214776853|
+----+--------------------+
only showing top 10 rows



**SQL Query solution**

In [None]:
# Selecting year and the average commodity price to display in ascending order
query = '''
        SELECT year, 
               AVG(trade_usd) AS media
        FROM tabela
        GROUP BY year
        ORDER BY year; 
        '''
spark.sql(query).show(10)

+----+--------------------+
|year|               media|
+----+--------------------+
|1988| 1.864297055638571E7|
|1989|1.1263871329920229E7|
|1990| 1.172426586778952E7|
|1991| 1.306922385515173E7|
|1992|   9402960.863025468|
|1993|1.0353959855309162E7|
|1994|1.1350325049077941E7|
|1995|1.2286454103356835E7|
|1996|1.1945524161286663E7|
|1997|   9549881.214776853|
+----+--------------------+
only showing top 10 rows



##**4: The average price of commodities per unit type, year, and category in the export flow in Brazil.**

**Dataframe Solution**

In [None]:
# Filtering to pick only the transations from the Export flow made by Brazil
# Grouping by year, unit_type and category to then calculate the average commodity price
df.filter(
    (col('flow') == 'Export') & (col('country_or_area') == 'Brazil')) \
    .groupBy(col('year'),
             col('quantity_name').alias('unit_type'),
             col('category')) \
    .agg(avg('trade_usd').alias('average')).orderBy(col('year').desc(), col('unit_type').desc(), col('category').desc()).show(10)

+----+-------------------+--------------------+--------------------+
|year|          unit_type|            category|             average|
+----+-------------------+--------------------+--------------------+
|2016|Weight in kilograms|97_works_of_art_c...|            305909.0|
|2016|Weight in kilograms|96_miscellaneous_...|           2152944.5|
|2016|Weight in kilograms|95_toys_games_spo...|            667820.0|
|2016|Weight in kilograms|94_furniture_ligh...|   5116585.142857143|
|2016|Weight in kilograms|91_clocks_and_wat...|              1672.0|
|2016|Weight in kilograms|88_aircraft_space...|        1.69835892E8|
|2016|Weight in kilograms|87_vehicles_other...|1.4383903633333334E8|
|2016|Weight in kilograms|86_railway_tramwa...|           4160027.0|
|2016|Weight in kilograms|83_miscellaneous_...| 4.189034026666667E8|
|2016|Weight in kilograms|82_tools_implemen...|           1865728.5|
+----+-------------------+--------------------+--------------------+
only showing top 10 rows



**SQL Query solution**

In [None]:
# Selecting year, unit_type and category in the Export flow in Brazil
# To calculate the average commodity price
query = '''
        SELECT year, 
               quantity_name AS unit_type, 
               category, 
               AVG(trade_usd) AS media
        FROM tabela
        WHERE flow = "Export" AND country_or_area = "Brazil"
        GROUP BY year, unit_type, category
        ORDER BY year DESC, unit_type DESC, category DESC; 
        '''
spark.sql(query).show(10)

+----+-------------------+--------------------+--------------------+
|year|          unit_type|            category|               media|
+----+-------------------+--------------------+--------------------+
|2016|Weight in kilograms|97_works_of_art_c...|            305909.0|
|2016|Weight in kilograms|96_miscellaneous_...|           2152944.5|
|2016|Weight in kilograms|95_toys_games_spo...|            667820.0|
|2016|Weight in kilograms|94_furniture_ligh...|   5116585.142857143|
|2016|Weight in kilograms|91_clocks_and_wat...|              1672.0|
|2016|Weight in kilograms|88_aircraft_space...|        1.69835892E8|
|2016|Weight in kilograms|87_vehicles_other...|1.4383903633333334E8|
|2016|Weight in kilograms|86_railway_tramwa...|           4160027.0|
|2016|Weight in kilograms|83_miscellaneous_...| 4.189034026666667E8|
|2016|Weight in kilograms|82_tools_implemen...|           1865728.5|
+----+-------------------+--------------------+--------------------+
only showing top 10 rows



##**5: The maximum, minimum, and mean transaction price per unit type and year.**

**Dataframe solution**

In [None]:
# Grouping by year and unit_type 
# Then calculate the max, min and average commodity price
df.groupBy(col('year'), col('quantity_name').alias('unit_type'))\
  .agg(min('trade_usd').alias('MIN'), 
       max('trade_usd').alias('MAX'), 
       avg('trade_usd').alias('MEDIA'))\
  .orderBy(col('year').desc(), col('unit_type').desc()).show(10)

+----+--------------------+-------+-----------+--------------------+
|year|           unit_type|    MIN|        MAX|               MEDIA|
+----+--------------------+-------+-----------+--------------------+
|2016| Weight in kilograms|      1|54041714444|2.9000750044637196E7|
|2016|    Weight in carats|7957993|    9557468|           8757730.5|
|2016|    Volume in litres|     11| 1547191989| 2.819293736598891E7|
|2016|Volume in cubic m...|    203| 4052653026| 4.540399222794118E7|
|2016|  Thousands of items|   1500|    8554139|           2027251.0|
|2016|     Number of pairs|     20| 1865315579| 3.934150441324201E7|
|2016|  Number of packages|   2666|  115285573|   6871851.043478261|
|2016|     Number of items|      1|19782901523| 3.544705415630021E7|
|2016|    Length in metres|     19|     961206|    74562.9512195122|
|2016|Electrical energy...|1128262| 1065282687|     2.33217751375E8|
+----+--------------------+-------+-----------+--------------------+
only showing top 10 rows



**SQL Query solution**

In [None]:
# Selecting year and unit_type
# To find the max, min and average commodity value
query = '''
        SELECT year, 
               quantity_name AS unit_type, 
               MAX(trade_usd) AS MIN,
               MIN(trade_usd) AS MAX, 
               AVG(trade_usd) AS MEDIA
        FROM tabela
        GROUP BY year, unit_type
        ORDER BY year DESC, unit_type DESC; 
        '''
spark.sql(query).show(10)

+----+--------------------+-----------+-------+--------------------+
|year|           unit_type|        MIN|    MAX|               MEDIA|
+----+--------------------+-----------+-------+--------------------+
|2016| Weight in kilograms|54041714444|      1|2.9000750044637196E7|
|2016|    Weight in carats|    9557468|7957993|           8757730.5|
|2016|    Volume in litres| 1547191989|     11| 2.819293736598891E7|
|2016|Volume in cubic m...| 4052653026|    203| 4.540399222794118E7|
|2016|  Thousands of items|    8554139|   1500|           2027251.0|
|2016|     Number of pairs| 1865315579|     20| 3.934150441324201E7|
|2016|  Number of packages|  115285573|   2666|   6871851.043478261|
|2016|     Number of items|19782901523|      1| 3.544705415630021E7|
|2016|    Length in metres|     961206|     19|    74562.9512195122|
|2016|Electrical energy...| 1065282687|1128262|     2.33217751375E8|
+----+--------------------+-----------+-------+--------------------+
only showing top 10 rows



##**6: The country with the largest average commodity price in the Export flow.**

**Dataframe Solution**

In [None]:
# Filtering the file to pick only the transactions from the Export flow.
# Calculating the average price and limiting by 1 to find the highest value
df.filter(
    (col('flow') == 'Export')) \
    .groupBy(col('country_or_area').alias('country'))\
    .agg( 
       avg('trade_usd').alias('MEDIA'))\
    .orderBy(col('MEDIA').desc()).limit(1).show(10)

+-------+--------------------+
|country|               MEDIA|
+-------+--------------------+
| Angola|1.636966606814285...|
+-------+--------------------+



**SQL Query solution**

In [None]:
# Selecting the country and average price where flow is Export
# Limiting by 1 to find the highest value and country
query = '''
        SELECT country_or_area AS country, 
               AVG(trade_usd) AS media
        FROM tabela
        WHERE flow = "Export"
        GROUP BY country_or_area
        ORDER BY media DESC
        LIMIT 1; 
        '''
spark.sql(query).show(10)

+-------+--------------------+
|country|               media|
+-------+--------------------+
| Angola|1.636966606814285...|
+-------+--------------------+



##**7: The most commercialized commodity (summing the quantities) in 2016, per flow type.**

**Dataframe Solution**

In [None]:
# Selecting flow, commodity to sum the quantities in 2016
df1 = df.filter(
    (col('year') == '2016')) \
    .groupBy(col('flow').alias('flow1'), col('commodity'))\
    .agg( 
       sum(('quantity')).alias('Quant'))

# Dataframe that holds the max value of the quantity
df2 = df1.groupBy(col('flow1').alias('flow2'))\
    .agg( 
       max('Quant').alias('Max quant'))

# Condition to perform the inner join operation
cond = [col('flow1') == col('flow2'), col('Quant') == col('Max quant')]

# Inner Join
df1.join(df2, cond, 'inner')\
        .select(col('flow1').alias('flow'), col('commodity'), col('Quant'))\
        .orderBy(col('flow')).show(10)

+---------+--------------------+----------------+
|     flow|           commodity|           Quant|
+---------+--------------------+----------------+
|   Export|Iron ore, concent...|3.79546246752E11|
|   Import|Petroleum oils, o...|2.58289373308E11|
|Re-Export|Safety razor blad...|      1.261968E9|
|Re-Import|Chem wood pulp, s...|     3.8774873E7|
+---------+--------------------+----------------+



**SQL Query solution**

In [None]:
# Selecting flow, commodity and amount
# Doing an inner join of subqueries to find the highest commodity in each flow type

# First query: Summing the quantities
# Second query: Getting the max value of the quantities
query = '''
        SELECT t1.flow, t1.commodity, t1.quantity as amount
        FROM  (
          SELECT commodity, flow, SUM(quantity) AS quantity
          FROM tabela
          WHERE year = '2016'
          GROUP BY flow, commodity
        ) as t1
        INNER JOIN  (
          SELECT flow, MAX(quantity) AS quantity
          FROM  (
            SELECT commodity, flow, SUM(quantity) AS quantity
            FROM tabela
            WHERE year = '2016'
            GROUP BY flow, commodity
          )
          GROUP BY flow
        ) as t2
        ON t1.flow = t2.flow
        AND t1.quantity = t2.quantity
        ORDER BY t1.flow
        '''
spark.sql(query).show(10)

+---------+--------------------+----------------+
|     flow|           commodity|          amount|
+---------+--------------------+----------------+
|   Export|Iron ore, concent...|3.79546246752E11|
|   Import|Petroleum oils, o...|2.58289373308E11|
|Re-Export|Safety razor blad...|      1.261968E9|
|Re-Import|Chem wood pulp, s...|     3.8774873E7|
+---------+--------------------+----------------+

