In [None]:
! pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, to_date, lit, when
from pyspark.sql import functions as f
'''Create  the session'''
spark = SparkSession.builder.appName("Dataframe Reporting").getOrCreate()
datasets_path = '/content/drive/MyDrive/PMN/data/'

df_full = spark.read.csv(datasets_path + 'output_csv_full.csv', header=True, inferSchema=True)
df_country = spark.read.csv(datasets_path + 'country_classification.csv', header=True, inferSchema=True)
data_service = spark.read.csv(datasets_path + 'services_classification.csv', header=True, inferSchema=True)
data_good = spark.read.csv(datasets_path + 'goods_classification.csv', header=True, inferSchema=True)


In [None]:
data_service.show()

+-------+--------------------+
|   code|       service_label|
+-------+--------------------+
|    A12|            Services|
|  A1202|Maintenance and r...|
|  A1203|      Transportation|
| A12031|       Sea transport|
|A120312|             Freight|
|A120313|               Other|
| A12032|       Air transport|
|A120321|Air passenger tra...|
|A120322|Air freight trans...|
|A120323| Air other transport|
| A12034|Postal and courie...|
|  A1204|              Travel|
| A12041|     Business travel|
| A12042|     Personal travel|
|A120421|Education related...|
|A120422|Health related tr...|
|A120423|Other personal tr...|
|  A1205|Construction serv...|
|  A1206|Insurance and pen...|
| A12062|         Reinsurance|
+-------+--------------------+
only showing top 20 rows



In [None]:
# Question 1: Convertir le format de la date '202206' vers '01/06/2022'
def question_1(df):
    rdf = df.withColumn("date", to_date(concat_ws("/", lit(
        "01"), col("time_ref").substr(5, 2), col(
        "time_ref").substr(1, 4)), "d/M/yyyy"))

    return rdf

In [None]:
# Question 2: Extraire l'année
def question_2(df):
    rdf = df.withColumn("year", f.year(df["date"]))
    return rdf


In [None]:
# Question 3-4: Ajouter le nom du pays
def question_3(df):
    rdf = df.join(df_country, on="country_code", how="left")
    return rdf

In [None]:

# Question 5: Ajouter une colonne is_services (1 si Services, 0 sinon)
def question_5(df):
    rdf = df.withColumn("details_service", when(col("product_type") == "Services", 1).otherwise(0))
    return rdf


In [None]:
from pyspark.sql.functions import sum as _sum
# Question 6 : Classer les pays Exporteurs par Services et Goods
def question_6(df):
    df_exporters = df.filter(col("account") == 'Exports')

    grouped_df = df_exporters.groupby('country_label') \
        .agg(_sum('details_good').alias('total_goods'),
             _sum('details_service').alias('total_service'))
    sorted_df = grouped_df.orderBy(col("total_goods").desc(), col("total_service").desc())
    return sorted_df

In [None]:
# Question 7 : Classer les pays Importeurs par Services et Goods
def question_7(df):
    df_exporters = df.filter(col("account") == 'Imports')

    grouped_df = df_exporters.groupby('country_label') \
        .agg(_sum('details_good').alias('total_goods'),
             _sum('details_service').alias('total_service'))
    sorted_df = grouped_df.orderBy(col("total_goods").desc(), col("total_service").desc())
    return sorted_df

In [None]:
# Question 8 : regroupement par good
def question_8(df):
  #df_exporters = df.filter(col("account") == 'Imports')

    grouped_df = df.groupby('country_label') \
        .agg(_sum('details_good').alias('total_goods'))
    sorted_df = grouped_df.orderBy(col("total_goods").desc())
    return sorted_df



In [None]:
# Question 9 : regroupement des pays  par service
def question_9(df):
  #df_exporters = df.filter(col("account") == 'Imports')

    grouped_df = df.groupby('country_label') \
        .agg(_sum('details_service').alias('total_service'))
    sorted_df = grouped_df.orderBy(col("total_service").desc())
    return sorted_df

In [None]:
# Question 10 : la liste des services exporté de la france
def question_10(df):
    df_export_france_services = df.filter((col("account") == "Exports") & (col("product_type") == "Services") & (col("country_code") == "FR"))
    return df_export_france_services


In [None]:
# Question 11 : la liste des services importé de la france
def question_11(df):
    df_import_france_goods = df.filter((col("account") == "Imports") & (col("product_type") == "Goods") & (col("country_code") == "FR"))
    return df_import_france_goods


In [None]:
# Question 12 : classement des services les moins demandés
def question_12(df):
    df_services = df.filter(col("product_type") == "Services")
    grouped_df = df_services.groupby("country_label") \
        .agg(_sum("details_service").alias("total_service"))
    sorted_df = grouped_df.orderBy(col("total_service").asc())
    return sorted_df


In [None]:
# Question 13 : classement des goods les plus demandé
def question_13(df):
    df_goods = df.filter(col("product_type") == "Goods")
    grouped_df = df_goods.groupby("country_label") \
        .agg(_sum("details_good").alias("total_goods"))
    sorted_df = grouped_df.orderBy(col("total_goods").desc())
    return sorted_df

In [None]:
# Question 14 : Ajouter la colonne status_import_export (négative si import > export, sinon positive par pays)
def question_14(df):
    grouped_df = df.groupby("country_label").agg(_sum("details_good").alias("total_goods"), _sum("details_service").alias("total_service"))
    rdf = grouped_df.withColumn("status_import_export", when((col("total_goods") - col("total_service")) > 0, "positive").otherwise("negative"))
    return rdf


In [None]:
# Question 15 : Ajouter la column difference_import_expor
def question_15(df):
    grouped_df = df.groupby("country_label").agg(_sum("details_good").alias("total_goods"), _sum("details_service").alias("total_service"))
    rdf = grouped_df.withColumn("difference_import_export", col("total_goods") - col("total_service"))
    return rdf

In [None]:
# Question 16 : Ajouter la column Somme_good
def question_16(df):
    grouped_df = df.groupby("country_label").agg(_sum("details_good").alias("Somme_good"))
    return grouped_df

In [None]:
# Question 17 : Ajouter la column Somme_service
def question_17(df):
    grouped_df = df.groupby("country_label").agg(_sum("details_service").alias("Somme_service"))
    return grouped_df

In [None]:
# Question 18 : Ajouter la colonne pourcentages_good (pourcentage de la colonne good par rapport à tous les goods d'un seul pays)
def question_18(df):
   rdf = df.withColumn("pourcentages_good", (col("difference_import_export") / col("Total_goods")) * 100)
   return rdf

In [None]:
# Question 19 : Ajouter la colonne pourcentages_good (pourcentage de la colonne good par rapport à tous les goods d'un seul pays)
def question_19(df):
   rdf = df.withColumn("pourcentages_service", (col("difference_import_export") / col("total_Service")) * 100)
   return rdf

In [None]:
r1 = question_1(df_full)
#r1.show() 



In [None]:
r2 = question_2(r1)
#r2.show() 

In [None]:
r3 = question_3(r2)
#r3.show() 

In [None]:
r4 = question_4(r3)
#r4.show() 


In [None]:
r5 = question_5(r4)
#r5.show() 

In [None]:
r6 = question_6(r5)
#r6.show() 

In [None]:
r7 = question_7(r5)
#r7.show() 

In [None]:
r8 = question_8(r5)
#r8.show()

In [None]:
r9 = question_9(r5)
#r9.show()

In [None]:
r10 = question_10(r5)
#r10.show()

In [None]:
r11 = question_11(r5)
#r11.show()

In [None]:
r12 = question_12(r5)
#r12.show()

In [None]:
r13 = question_13(r5)
#r13.show()

In [None]:
r14 = question_13(r5)
#r14.show()

In [None]:
r15 = question_15(r5)
#r15.show()

In [None]:
r16 = question_16(r5)
#r16.show()

In [None]:
df_full.show(

)

+--------+-------+----+------------+------------+-------------+------+
|time_ref|account|code|country_code|product_type|        value|status|
+--------+-------+----+------------+------------+-------------+------+
|  202206|Exports|  00|          AE|       Goods| 2.18885468E8|     F|
|  202206|Exports|  00|          AG|       Goods|     253686.0|     F|
|  202206|Exports|  00|          AI|       Goods|      14070.0|     F|
|  202206|Exports|  00|          AL|       Goods|     260451.0|     F|
|  202206|Exports|  00|          AM|       Goods|     700191.0|     F|
|  202206|Exports|  00|          AO|       Goods|    4405208.0|     F|
|  202206|Exports|  00|          AR|       Goods|    8006648.0|     F|
|  202206|Exports|  00|          AS|       Goods|    7290217.0|     F|
|  202206|Exports|  00|          AT|       Goods|  1.0618267E7|     F|
|  202206|Exports|  00|          AU|       Goods|2.161743586E9|     F|
|  202206|Exports|  00|          AW|       Goods|     122920.0|     F|
|  202