In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [62]:
df_accounts = spark.read \
    .format("csv") \
    .option("header","true") \
    .option("inferSchema","true") \
    .option("delimiter",";") \
    .load('./data/accounts-original.csv')

df_accounts.limit(5).show()

+---+----------+---------+---+-------+
| id|first_name|last_name|age|country|
+---+----------+---------+---+-------+
|  1|     Darcy| Phillips| 24|     YE|
|  2|    Amelia|   Wright| 66|     CN|
|  3|     Haris|    Ellis| 61|     CR|
|  4|      Tony|     Hall| 51|     JO|
|  5|     Rubie|  Stewart| 27|     RO|
+---+----------+---------+---+-------+



In [14]:
df_country_abbreviation = spark.read.format("csv") \
    .option("header","true") \
    .option("inferSchema","true") \
    .option("delimiter",";") \
    .load('./data/country_abbreviation-original.csv')

# df_country_abbreviation.show()

# df_ch = df_country_abbreviation.filter(df_country_abbreviation.country_full_name == 'Switzerland') 

df_ch = df_country_abbreviation.filter("country_full_name == 'Switzerland'") 

df_ch.show()

+-----------------+------------+
|country_full_name|abbreviation|
+-----------------+------------+
|      Switzerland|          CH|
+-----------------+------------+



In [61]:
df_transactions = spark.read \
    .format("csv") \
    .option("header","true") \
    .option("inferSchema","true").option("delimiter",";") \
    .load('./data/transactions-original.csv')

df_transactions.limit(5).show()

+------+-------+------------+----------------+-------+
|    id| amount|account_type|transaction_date|country|
+------+-------+------------+----------------+-------+
|179528|-730.86|    Business|      2013-07-10|     SV|
|378343|-946.98|    Personal|      2018-04-06|     YE|
| 75450|7816.92|Professional|      2016-11-20|     SI|
|357719| 704.02|    Business|      2016-11-06|     ID|
|110511| 3462.6|    Personal|      2018-01-18|     BS|
+------+-------+------------+----------------+-------+



In [60]:
df_transactions_ch = df_transactions.sort('id').where('country == "CH"').limit(5)
df_transactions_ch.show()

+---+-------+------------+----------------+-------+
| id| amount|account_type|transaction_date|country|
+---+-------+------------+----------------+-------+
|  8|3481.48|Professional|      2015-09-29|     CH|
| 19|5630.04|Professional|      2017-01-04|     CH|
| 20|-432.68|Professional|      2020-03-16|     CH|
| 48|7814.77|Professional|      2020-09-09|     CH|
| 51|8011.46|    Business|      2020-12-21|     CH|
+---+-------+------------+----------------+-------+



In [63]:
df_accounts.select('id','first_name','last_name').where('country == "CH"').limit(5).show()

+---+----------+---------+
| id|first_name|last_name|
+---+----------+---------+
| 23| Frederick| Morrison|
|108|Maximilian|  Chapman|
|158|     Spike|   Taylor|
|398|   Chester|Henderson|
|443|      Myra|  Perkins|
+---+----------+---------+



In [54]:
from pyspark.sql.functions import sum

df_balance = df_transactions \
    .groupBy('id', 'transaction_date') \
    .agg(
        sum('amount') \
            .alias('total_amount')) \
    .where(sum('amount') > 0) \
    .sort(df_transactions.id)

df_balance.show()

+---+----------------+------------+
| id|transaction_date|total_amount|
+---+----------------+------------+
|  1|      2021-04-20|        3.29|
|  1|      2020-10-14|      3131.4|
|  1|      2018-03-18|     9153.86|
|  1|      2019-05-08|     8268.84|
|  1|      2017-06-07|     7571.59|
|  1|      2020-12-24|     5118.27|
|  1|      2021-08-19|     7831.55|
|  1|      2011-04-08|     9970.67|
|  1|      2012-06-01|     1280.65|
|  2|      2012-04-06|      6546.8|
|  2|      2013-07-25|      1427.1|
|  2|      2012-03-03|     8353.12|
|  2|      2016-09-19|     2138.24|
|  2|      2021-01-27|      6270.4|
|  2|      2012-01-30|     7228.96|
|  2|      2019-11-16|      9985.4|
|  2|      2012-03-04|     8009.27|
|  2|      2021-02-25|     4904.86|
|  2|      2016-08-19|     4419.15|
|  3|      2017-04-19|     4110.29|
+---+----------------+------------+
only showing top 20 rows



In [56]:
df_transactions.where("id == 1").show()

+---+-------+------------+----------------+-------+
| id| amount|account_type|transaction_date|country|
+---+-------+------------+----------------+-------+
|  1|8268.84|    Business|      2019-05-08|     ZW|
|  1|-420.37|Professional|      2012-12-04|     LK|
|  1|9970.67|Professional|      2011-04-08|     PY|
|  1|   3.29|    Business|      2021-04-20|     MW|
|  1| 3131.4|    Business|      2020-10-14|     MX|
|  1|9153.86|    Personal|      2018-03-18|     BB|
|  1|5118.27|    Personal|      2020-12-24|     DO|
|  1|7831.55|Professional|      2021-08-19|     MR|
|  1|7571.59|    Personal|      2017-06-07|     CI|
|  1|1280.65|    Business|      2012-06-01|     IR|
+---+-------+------------+----------------+-------+



In [38]:
from_ch = df_ch \
    .join(df_accounts, df_ch.abbreviation == df_accounts.country, 'inner') \
    .select(df_accounts.id, df_accounts.first_name, df_accounts.last_name, df_accounts.country) \

from_ch.show()

+----+----------+---------+-------+
|  id|first_name|last_name|country|
+----+----------+---------+-------+
|  23| Frederick| Morrison|     CH|
| 108|Maximilian|  Chapman|     CH|
| 158|     Spike|   Taylor|     CH|
| 398|   Chester|Henderson|     CH|
| 443|      Myra|  Perkins|     CH|
| 662|     Haris|     Hill|     CH|
| 815|    Rafael|    Owens|     CH|
| 816|     David|  Russell|     CH|
|1177|     Amber|    Baker|     CH|
|1368|     Byron|   Walker|     CH|
|1468|    Amanda|   Bailey|     CH|
|1618|     Paige|  Barrett|     CH|
|1714|    George| Harrison|     CH|
|1757|  Adrianna|   Tucker|     CH|
|1783|    Wilson| Morrison|     CH|
|1825|   Michael|   Harper|     CH|
|2069|      Alen|    Payne|     CH|
|2076|    Rafael|  Perkins|     CH|
|2331|    Sienna|    Ellis|     CH|
|2339| Frederick|    Baker|     CH|
+----+----------+---------+-------+
only showing top 20 rows



In [45]:
from_ch \
    .join(df_balance, df_balance.id == from_ch.id, 'inner') \
    .select(from_ch.id, from_ch.first_name, from_ch.last_name, df_balance.transaction_date, df_balance.total_amount) \
    .sort(from_ch.id) \
    .show()
    

+---+----------+---------+----------------+------------+
| id|first_name|last_name|transaction_date|total_amount|
+---+----------+---------+----------------+------------+
| 23| Frederick| Morrison|      2016-06-25|     1582.28|
| 23| Frederick| Morrison|      2021-06-16|       512.3|
| 23| Frederick| Morrison|      2020-09-04|     9360.72|
| 23| Frederick| Morrison|      2021-01-11|     9686.86|
| 23| Frederick| Morrison|      2012-10-24|     8707.01|
| 23| Frederick| Morrison|      2016-05-14|     5574.68|
| 23| Frederick| Morrison|      2019-05-10|     7892.81|
|108|Maximilian|  Chapman|      2016-10-17|     2502.55|
|108|Maximilian|  Chapman|      2019-06-12|     6172.79|
|108|Maximilian|  Chapman|      2016-03-31|     8899.88|
|108|Maximilian|  Chapman|      2012-04-13|     3725.31|
|108|Maximilian|  Chapman|      2016-05-18|     6182.78|
|108|Maximilian|  Chapman|      2016-07-05|     7405.78|
|108|Maximilian|  Chapman|      2020-02-16|     9933.39|
|108|Maximilian|  Chapman|     

In [66]:
names_ch_total = from_ch \
    .join(df_balance, df_balance.id == from_ch.id, 'inner') \
    .select(from_ch.id, from_ch.first_name, from_ch.last_name, df_balance.transaction_date, df_balance.total_amount) \
    .where('id = 23 or id = 108') \
    .sort(from_ch.id)

names_ch_total.show()

+---+----------+---------+----------------+------------+
| id|first_name|last_name|transaction_date|total_amount|
+---+----------+---------+----------------+------------+
| 23| Frederick| Morrison|      2016-05-14|     5574.68|
| 23| Frederick| Morrison|      2021-01-11|     9686.86|
| 23| Frederick| Morrison|      2021-06-16|       512.3|
| 23| Frederick| Morrison|      2019-05-10|     7892.81|
| 23| Frederick| Morrison|      2012-10-24|     8707.01|
| 23| Frederick| Morrison|      2020-09-04|     9360.72|
| 23| Frederick| Morrison|      2016-06-25|     1582.28|
|108|Maximilian|  Chapman|      2016-05-11|     7547.69|
|108|Maximilian|  Chapman|      2016-10-17|     2502.55|
|108|Maximilian|  Chapman|      2013-10-12|     2047.12|
|108|Maximilian|  Chapman|      2020-02-16|     9933.39|
|108|Maximilian|  Chapman|      2016-07-05|     7405.78|
|108|Maximilian|  Chapman|      2016-05-18|     6182.78|
|108|Maximilian|  Chapman|      2019-06-12|     6172.79|
|108|Maximilian|  Chapman|     

In [80]:
names_ch_total \
    .groupBy('id','first_name', 'last_name', names_ch_total.transaction_date[0:4].alias('year') ) \
    .agg({"total_amount": "sum"}) \
        .alias('total_amount') \
    .sort('id', 'year') \
    .show()

+---+----------+---------+----+-----------------+
| id|first_name|last_name|year|sum(total_amount)|
+---+----------+---------+----+-----------------+
| 23| Frederick| Morrison|2012|          8707.01|
| 23| Frederick| Morrison|2016|          7156.96|
| 23| Frederick| Morrison|2019|          7892.81|
| 23| Frederick| Morrison|2020|          9360.72|
| 23| Frederick| Morrison|2021|         10199.16|
|108|Maximilian|  Chapman|2012|          3725.31|
|108|Maximilian|  Chapman|2013|          2047.12|
|108|Maximilian|  Chapman|2016|         32538.68|
|108|Maximilian|  Chapman|2019|          6172.79|
|108|Maximilian|  Chapman|2020|          9933.39|
+---+----------+---------+----+-----------------+



In [101]:
from pyspark.sql.functions import concat, col, lit

formatting = names_ch_total \
    .groupBy('id','first_name', 'last_name', names_ch_total.transaction_date[0:4].alias('year') ) \
    .agg(
        sum('total_amount') \
            .alias('total_amount')) \
    .sort('id', 'year')

# formatting.show()

# formatting \
#     .select(concat(col("first_name"), lit(" "), col("last_name"), lit(' '), col('year'), lit(' '), col('total_amount')) \
#             .alias('First Name Last Name Year Total')) \
#     .show(n=5, truncate=False, vertical=True)

formatting \
    .select(concat(col("first_name"), lit(" "), col("last_name"), lit(' '), col('year'), lit(' '), col('total_amount')) \
            .alias('First Name Last Name Year Total')) \
    .show(n=5, truncate=False)

# formatting \
#     .select("first_name", "last_name", 'year', 'total_amount') \
#     .show(n=5, truncate=False, vertical=True)



+--------------------------------+
|First Name Last Name Year Total |
+--------------------------------+
|Frederick Morrison 2012 8707.01 |
|Frederick Morrison 2016 7156.96 |
|Frederick Morrison 2019 7892.81 |
|Frederick Morrison 2020 9360.72 |
|Frederick Morrison 2021 10199.16|
+--------------------------------+
only showing top 5 rows



In [29]:

# Join

# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html 
# https://www.geeksforgeeks.org/pyspark-join-types-join-two-dataframes/

# group by 
# https://sparkbyexamples.com/pyspark/pyspark-groupby-explained-with-example/