In [1]:
from pyspark.sql import SparkSession
import os

In [2]:
local=False
if local:
    spark=SparkSession.builder.master("local[4]") \
                  .appName("aida_poc_etl").getOrCreate()
else:
    spark=SparkSession.builder \
                      .master("k8s://https://kubernetes.default.svc:443") \
                      .appName("aida_poc_etl") \
                      .config("spark.kubernetes.container.image",os.environ["IMAGE_NAME"]) \
                      .config("spark.kubernetes.authenticate.driver.serviceAccountName",os.environ['KUBERNETES_SERVICE_ACCOUNT']) \
                      .config("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE']) \
                      .config("spark.executor.instances", "10") \
                      .config("spark.executor.memory","16g") \
                      .config("spark.driver.memory","32g") \
                      .config('spark.jars.packages','org.postgresql:postgresql:42.2.24') \
                      .enableHiveSupport() \
                      .getOrCreate()



:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c5e5e4c0-877f-42b2-99ff-64647e08e4a8;1.0
	confs: [default]
	found org.postgresql#postgresql;42.2.24 in central
	found org.checkerframework#checker-qual;3.5.0 in central
downloading https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.24/postgresql-42.2.24.jar ...
	[SUCCESSFUL ] org.postgresql#postgresql;42.2.24!postgresql.jar (117ms)
downloading https://repo1.maven.org/maven2/org/checkerframework/checker-qual/3.5.0/checker-qual-3.5.0.jar ...
	[SUCCESSFUL ] org.checkerframework#checker-qual;3.5.0!checker-qual.jar (23ms)
:: resolution report :: resolve 1914ms :: artifacts dl 144ms
	:: modules in use:
	org.checkerframework#checker-qual;3.5.0 from central in [default]
	org.postgresql#postgresql;42.2.24 from central in [default]
	------------------------------

In [3]:
work_dir="s3a://projet-poc-aida/rp"
file_name="individus.csv"
file_path=f"{work_dir}/{file_name}"


In [None]:
# use option
df=spark.read\
    .option("header",True)\
    .option("inferSchema", True) \
    .option("delimiter",';') \
    .csv(path=file_path)

In [5]:
# df.cache()
df.show(5,truncate=False)

2022-04-27 13:41:32,295 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+----------------+---------------------+-----------------+--------------+-------------------+---------------+------------------+-------------+--------------+------------------+----+----------+-------+----------------+-------+---+---------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|region_residence|departement_residence|commune_residence|region_travail|departement_travail|commune_travail|commune_anterieure|commune_etude|pays_naissance|poids             |sexe|statut_pro|densite|recherche_emplo

In [8]:
! mc ls --summarize s3/projet-poc-aida/rp/individus.csv | grep "Total Size"

Total Size: 14 GiB


In [9]:
df.count()

                                                                                

50000000

In [10]:
df.printSchema()

root
 |-- region_residence: integer (nullable = true)
 |-- departement_residence: string (nullable = true)
 |-- commune_residence: string (nullable = true)
 |-- region_travail: integer (nullable = true)
 |-- departement_travail: string (nullable = true)
 |-- commune_travail: string (nullable = true)
 |-- commune_anterieure: string (nullable = true)
 |-- commune_etude: string (nullable = true)
 |-- pays_naissance: integer (nullable = true)
 |-- poids: double (nullable = true)
 |-- sexe: integer (nullable = true)
 |-- statut_pro: string (nullable = true)
 |-- densite: integer (nullable = true)
 |-- recherche_emploi: integer (nullable = true)
 |-- diplome: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- variable00: long (nullable = true)
 |-- variable01: integer (nullable = true)
 |-- variable02: integer (nullable = true)
 |-- variable03: integer (nullable = true)
 |-- variable04: double (nullable = true)
 |-- variable05: double (nullable = true)
 |-- variable06: intege

In [11]:
partition_number=df.rdd.getNumPartitions()
print(f"Data partition number: {partition_number}")

Data partition number: 109


# Step 1. Convert CSV to Parquet

## 1.1 Write parquet with random partition

In [12]:
output_file="individus_snappy_parquet"

output_path=f"{work_dir}/{output_file}"

df.write.mode("overwrite").parquet(output_path)

                                                                                

In [11]:
! mc ls --summarize s3/projet-poc-aida/rp/individus_snappy_parquet | grep "Total Size"

Total Size: 1.1 GiB


## 1.2 Write parquet with specific partition column

Below example will partition the parquet file with the given column. It can be a list of columns 

In [16]:
partition_output_file="individus_partition_region"
col_name="region_residence"

partition_output_path=f"{work_dir}/{partition_output_file}"

In [17]:
df.write.partitionBy(col_name).mode("overwrite").parquet(partition_output_path)

                                                                                

In [17]:
from pyspark.sql.types import StructType, IntegerType,StringType
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col

# Step 2. Do some analysis

We can use below query to do some performence test on csv, parquet, and partitioned parquet. 

### Query 1

```sql
select sum(poids), sexe, densite, statut_pro from TABLE where region_residence=’44’ and region_travail!=’44’ and statut_pro !=’Z’ group by  sexe, densite, statut_pro
```

In [18]:
import time;


def run_query_1(df):
    start = time.time()
    tmp=df.filter((col("region_residence")==44) & (col("region_travail")!=44) & (col("statut_pro")!="Z")).select("region_residence","region_travail","poids","sexe","densite","statut_pro")
    print("Data frame after filter")
    tmp.show()
    print("Final resutl data frame")
    df_resu1=tmp.groupBy("sexe","densite","statut_pro").agg(sum(col("poids")))
    df_resu1.show()
    end = time.time()
    print(f"Total time spent: {end-start}")
    

### 2.1 Test query with csv 

In [19]:
run_query_1(df)

Data frame after filter
+----------------+--------------+------------------+----+-------+----------+
|region_residence|region_travail|             poids|sexe|densite|statut_pro|
+----------------+--------------+------------------+----+-------+----------+
|              44|            52|0.9173226192328938|   2|      4|         1|
|              44|            27|1.0844752868227385|   2|      4|         1|
|              44|            84|1.4552633116136917|   1|      4|         1|
|              44|            32|1.2880844787953543|   1|      3|         2|
|              44|            76|0.8567941093656907|   1|      4|         1|
|              44|            75| 1.360188435113091|   1|      4|         1|
|              44|            76|0.8156473498675924|   2|      2|         1|
|              44|            75|1.2422037759139486|   2|      4|         1|
|              44|            76|0.5489129674794979|   2|      4|         2|
|              44|            84| 1.420871377293334|



+----+-------+----------+------------------+
|sexe|densite|statut_pro|        sum(poids)|
+----+-------+----------+------------------+
|   1|      4|         1| 84860.02165580635|
|   1|      3|         1| 81647.73008880654|
|   1|      3|         2|20318.627070021386|
|   2|      2|         2| 4594.674335848757|
|   2|      4|         2| 21197.94139432258|
|   2|      3|         1|  81280.1828071123|
|   2|      4|         1| 85804.86044169618|
|   1|      1|         1|2422.4620618296367|
|   2|      3|         2|  20696.8599381208|
|   1|      4|         2| 21014.73402813378|
|   1|      2|         1|18598.935870467652|
|   2|      1|         1| 2403.512048985781|
|   2|      2|         1| 18421.38674213923|
|   1|      2|         2|4560.4938174609615|
|   1|      1|         2|  607.757923308453|
|   2|      1|         2| 592.3437398684349|
+----+-------+----------+------------------+

Total time spent: 36.96817231178284


                                                                                

### 2.2 Test query with parquet


In [4]:
parquet_file_name="individus_snappy_parquet"
partition_parquet_name="individus-region-residence.parquet"
parquet_path=f"{work_dir}/{parquet_file_name}"

partition_parquet_path=f"{work_dir}/{partition_parquet_name}"

In [5]:
df_parquet=spark.read.parquet(parquet_path)

                                                                                

In [6]:
run_query_1(df_parquet)

NameError: name 'run_query_1' is not defined

### 2.3 Test query with partitioned parquet file

In [None]:
df_parquet_partition=spark.read.parquet(partition_parquet_path)

In [24]:
run_query_1(df_parquet_partition)

Data frame after filter


                                                                                

+----------------+--------------+------------------+----+-------+----------+
|region_residence|region_travail|             poids|sexe|densite|statut_pro|
+----------------+--------------+------------------+----+-------+----------+
|              44|            76|0.8238963726995286|   1|      4|         2|
|              44|            28| 1.319271725135387|   2|      2|         1|
|              44|            75|0.6567601781470883|   2|      3|         2|
|              44|            84| 1.253597183134104|   1|      4|         1|
|              44|            76|0.8731775260562563|   1|      4|         2|
|              44|            84| 0.536695960149986|   1|      3|         1|
|              44|            93| 1.396082818263767|   2|      1|         1|
|              44|            75|0.5569451529336501|   2|      3|         1|
|              44|            32|0.9938252776533797|   1|      4|         1|
|              44|            24|0.5336664630957909|   1|      4|         1|



+----+-------+----------+------------------+
|sexe|densite|statut_pro|        sum(poids)|
+----+-------+----------+------------------+
|   1|      4|         1| 84860.02165580633|
|   1|      3|         1| 81647.73008880646|
|   1|      3|         2|20318.627070021397|
|   2|      2|         2| 4594.674335848756|
|   2|      4|         2|21197.941394322577|
|   2|      3|         1| 81280.18280711232|
|   2|      4|         1| 85804.86044169606|
|   1|      1|         1|2422.4620618296367|
|   2|      3|         2|20696.859938120782|
|   1|      4|         2|21014.734028133775|
|   1|      2|         1|18598.935870467656|
|   2|      1|         1| 2403.512048985781|
|   2|      2|         1|18421.386742139228|
|   1|      2|         2|  4560.49381746096|
|   1|      1|         2| 607.7579233084527|
|   2|      1|         2| 592.3437398684353|
+----+-------+----------+------------------+

Total time spent: 12.353963613510132


                                                                                

### Step3 Create hive table 

Now we want to other people can use this table. So we use hive metastore to store and publish a table

In [8]:
data_path=f"{work_dir}/{parquet_file_name}"
schema_str = ', '.join([' '.join(x) for x in df_parquet.dtypes])
table_name="individus_aida"

spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {table_name}
({schema_str})
STORED as parquet LOCATION '{data_path}'
""")

2022-05-30 09:33:20,635 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


DataFrame[]

In [9]:
spark.sql('show tables;').show()

+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|  default|         fredo|      false|
|  default|     individus|      false|
|  default|individus_aida|      false|
|  default|        romain|      false|
+---------+--------------+-----------+



After we register the parquet file as a hive external table, we can use the sql() method to run pure sql query directly. Spark will search the table name on the default db.

In [42]:
spark.sql("""SELECT * FROM individus limit 5""").show()



+----------------+---------------------+-----------------+--------------+-------------------+---------------+------------------+-------------+--------------+------------------+----+----------+-------+----------------+-------+---+---------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|region_residence|departement_residence|commune_residence|region_travail|departement_travail|commune_travail|commune_anterieure|commune_etude|pays_naissance|             poids|sexe|statut_pro|densite|recherche_emplo

                                                                                

In [48]:
spark.sql("""
SELECT sum(poids), sexe, densite, statut_pro
FROM individus
WHERE region_residence='44' AND region_travail != '44' AND statut_pro != 'Z'
GROUP BY sexe, densite, statut_pro
""").show()



+------------------+----+-------+----------+
|        sum(poids)|sexe|densite|statut_pro|
+------------------+----+-------+----------+
| 84860.02165580635|   1|      4|         1|
| 81647.73008880643|   1|      3|         1|
|20318.627070021394|   1|      3|         2|
|4594.6743358487565|   2|      2|         2|
|21197.941394322577|   2|      4|         2|
| 81280.18280711232|   2|      3|         1|
| 85804.86044169607|   2|      4|         1|
|2422.4620618296362|   1|      1|         1|
|20696.859938120797|   2|      3|         2|
|21014.734028133786|   1|      4|         2|
| 18598.93587046767|   1|      2|         1|
| 2403.512048985782|   2|      1|         1|
|18421.386742139224|   2|      2|         1|
| 4560.493817460959|   1|      2|         2|
| 607.7579233084525|   1|      1|         2|
| 592.3437398684353|   2|      1|         2|
+------------------+----+-------+----------+



                                                                                

In [50]:
spark.sql("""
SELECT sum(poids), age, diplome
FROM individus
WHERE recherche_emploi IN ('1','2') AND region_residence='11' AND commune_residence IN ('75013','75014','75015','92100','92130','92075','92240','92120','94250','94200')
GROUP BY diplome, age
""").show()



+------------------+---+-------+
|        sum(poids)|age|diplome|
+------------------+---+-------+
| 9.177751090295155|  0|      1|
| 6.360279128629814|  1|      6|
|14.023248192593122|  4|      7|
| 17.15541038136797|  2|      2|
|10.180609173643694|  7|      1|
|12.058477661296191|  1|      7|
| 9.255402890330409|  3|      2|
| 12.23768843458804|  7|      4|
| 11.07721187612884|  2|      1|
|17.704675201373025|  0|      6|
|11.098099052856004|  9|      1|
|6.5043199507982346|  5|      0|
| 5.680795776812895|  6|      5|
| 8.823483188902223|  3|      1|
|16.322972771017838|  5|      2|
|15.291267484605667|  0|      0|
|15.498980785319798|  2|      7|
| 9.126540292650773|  4|      5|
|15.581455705160748|  9|      7|
| 8.409815355659886|  4|      2|
+------------------+---+-------+
only showing top 20 rows



                                                                                

In [52]:
spark.sql("""
SELECT sum(poids), pays_naissance, departement_residence
FROM individus
WHERE region_residence='44'
GROUP BY pays_naissance, departement_residence
""").show()



+------------------+--------------+---------------------+
|        sum(poids)|pays_naissance|departement_residence|
+------------------+--------------+---------------------+
| 86885.80634794573|            51|                   52|
| 74511.08026494356|            21|                   68|
|101801.91621098605|            41|                   55|
| 86962.65446028853|            31|                   52|
| 88498.68533646195|            12|                   10|
|148134.84723748371|            12|                   57|
| 87066.47522814618|            21|                   52|
|105343.26853180634|            31|                   67|
| 74258.88213579985|            31|                   68|
|103272.63170229828|            31|                   88|
|103167.34376872622|            51|                   88|
|104006.69954490617|            21|                   88|
|101665.45053942419|            10|                   55|
|125388.10926533878|            12|                   51|
| 92142.580919

                                                                                

# Step 4. Use Trino to analyse the hive table