## Imports and Initialization of script

In [1]:
!pip install pyspark

import sys
import os

os.environ['HADOOP_HOME'] = "C:\Hadoop"
sys.path.append("C:\Hadoop\bin")

os.environ['JAVA_HOME'] = "C:\Java"
sys.path.append("C:\Java\bin")



In [2]:
from pyspark.sql.functions import sum,avg,max,count, round, col, asc, DataFrame, expr, split, lit
from pyspark.sql import Row

from functools import reduce 

In [41]:
from pyspark.sql import SparkSession

spark = SparkSession.\
builder.\
appName("pyspark-notebook2").\
master("local").\
config("spark.executor.memory", "1g").\
config("spark.mongodb.input.uri","mongodb://127.0.0.1/Project6").\
config("spark.mongodb.output.uri","mongodb://127.0.0.1/Project6").\
config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
config("spark.jars", "\Spark\spark-3.3.1-bin-hadoop3\jars\mysql-connector-j-8.0.32.jar").\
getOrCreate()

In [4]:
# optimization of speed

spark.conf.set("spark.sql.shuffle.partitions", "500")

Rdd = spark.sparkContext.parallelize((0,20))
print("From local[5]"+str(Rdd.getNumPartitions()))

Rdd1 = spark.sparkContext.parallelize((0,25), 6)
print("parallelize : "+str(Rdd1.getNumPartitions()))
# Rdd1.saveAsTextFile("/FileStore/tables/partition22")

# Using repartition() function
Rdd2 = Rdd1.repartition(5)
print("Repartition size : " + str(Rdd2.getNumPartitions()))
# Rdd2.saveAsTextFile("/FileStore/tables/re-partition22")

# Using coalesce() function
Rdd3 = Rdd1.coalesce(5)
print("Repartition size : " + str(Rdd3.getNumPartitions()))
# Rdd3.saveAsTextFile("/FileStore/tables/coalesce22")

From local[5]1
parallelize : 6
Repartition size : 5
Repartition size : 5


## Extract

In [5]:
collection_list = ['Research_2007_2013', 'Research_2007_2013_org', 'Research_2007_2013_themes',\
                  'Research_2014_2020', 'Research_2014_2020_org', 'Research_2014_2020_themes',\
                  'Research_2021_2027', 'Research_2021_2027_org', 'Research_2021_2027_themes',\
                  'OCDE_ind']
    
def extract_collection(collection):
    spark.conf.set('spark.sql.caseSensitive', True)
    
    df_collection = spark.read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri", "mongodb://localhost:27017/Project6") \
    .option("database", "Project6") \
    .option("collection", collection) \
    .load()
    return df_collection

result = list(map(extract_collection, collection_list))

In [6]:
#check the size of the df's
print(collection_list[0], 'has ',result[0].count(), 'rows and ', len(result[0].columns), 'columns')
print(collection_list[1], 'has ',result[1].count(), 'rows and ', len(result[1].columns), 'columns')
print(collection_list[2], 'has ',result[2].count(), 'rows and ', len(result[2].columns), 'columns')
print(collection_list[3], 'has ',result[3].count(), 'rows and ', len(result[3].columns), 'columns')
print(collection_list[4], 'has ',result[4].count(), 'rows and ', len(result[4].columns), 'columns')
print(collection_list[5], 'has ',result[5].count(), 'rows and ', len(result[5].columns), 'columns')
print(collection_list[6], 'has ',result[6].count(), 'rows and ', len(result[6].columns), 'columns')
print(collection_list[7], 'has ',result[7].count(), 'rows and ', len(result[7].columns), 'columns')
print(collection_list[8], 'has ',result[8].count(), 'rows and ', len(result[8].columns), 'columns')

Research_2007_2013 has  25785 rows and  18 columns
Research_2007_2013_org has  140055 rows and  21 columns
Research_2007_2013_themes has  68651 rows and  5 columns
Research_2014_2020 has  35382 rows and  21 columns
Research_2014_2020_org has  177078 rows and  25 columns
Research_2014_2020_themes has  122551 rows and  5 columns
Research_2021_2027 has  5250 rows and  20 columns
Research_2021_2027_org has  36680 rows and  24 columns
Research_2021_2027_themes has  15810 rows and  5 columns


In [7]:
# list_x = [0, 1, 2]

# def printing(x):
#     print (collection_list[x], 'has ',result[x].count(), 'rows and ', len(result[x].columns), 'columns')
    
# list(map(printing(list_x)))

In [76]:
#extract the industry dataset
OCDE_ind = spark.read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri", "mongodb://localhost:27017/Project6") \
    .option("database", "Project6") \
    .option("collection", 'OCDE_ind') \
    .load()

## Transform

In [8]:
#drop columns

In [9]:
#drop columns on MAIN research dataframes
columns_main_07_13 = ('acronym', 'status', 'legalBasis', 'topics', 'subCall', 'fundingScheme', 'contentUpdateDate', 'nature')
columns_main_other = ('acronym', 'status', 'legalBasis', 'topics', 'subCall', 'fundingScheme', 'contentUpdateDate', 'ecSignatureDate', 'grantDoi', 'masterCall', 'nature')

research_2007_2013 = result[0].drop(*columns_main_07_13)
research_2014_2020 = result[3].drop(*columns_main_other)
research_2021_2027 = result[6].drop(*columns_main_other)

#check number of columns
print('The number of columns before drop were ', len(result[0].columns), 'and now are ', len(research_2007_2013.columns))
print('The number of columns before drop were ', len(result[3].columns), 'and now are ', len(research_2014_2020.columns))
print('The number of columns before drop were ', len(result[6].columns), 'and now are ', len(research_2014_2020.columns))

The number of columns before drop were  18 and now are  10
The number of columns before drop were  21 and now are  10
The number of columns before drop were  20 and now are  10


In [10]:
#drop columns on ORG research dataframes
columns_org_07_13 = ('projectAcronym', 'vatNumber', 'street', 'postCode', 'organizationURL', 'contactForm', 'contentUpdateDate', 'endOfParticipation')
columns_org_other = ('projectAcronym', 'vatNumber', 'street', 'postCode', 'organizationURL', 'contactForm', 'contentUpdateDate', 'endOfParticipation', 'SME', 'active', 'totalCost', 'netEcContribution')

research_2007_2013_org = result[1].drop(*columns_org_07_13)
research_2014_2020_org = result[4].drop(*columns_org_other)
research_2021_2027_org = result[7].drop(*columns_org_other)

#check number of columns
print('The number of columns before drop were ', len(result[1].columns), 'and now are ', len(research_2007_2013_org.columns))
print('The number of columns before drop were ', len(result[4].columns), 'and now are ', len(research_2014_2020_org.columns))
print('The number of columns before drop were ', len(result[7].columns), 'and now are ', len(research_2014_2020_org.columns))

The number of columns before drop were  21 and now are  13
The number of columns before drop were  25 and now are  13
The number of columns before drop were  24 and now are  13


In [11]:
#merge all main research df's

df_merge = research_2007_2013.unionAll(research_2014_2020)
df_research_main = df_merge.unionAll(research_2021_2027)

In [12]:
#create new column ecContribution% on research main

df_research_main.withColumn("% EcContibution", df_research_main.ecMaxContribution*100/df_research_main.totalCost)

DataFrame[_id: struct<oid:string>, ecMaxContribution: int, endDate: timestamp, frameworkProgramme: string, id: string, objective: string, rcn: string, startDate: timestamp, title: string, totalCost: double, % EcContibution: double]

In [13]:
#create new column start year

df_research_main = df_research_main.withColumn('start_year', split(df_research_main['startDate'], '-').getItem(0)) \
       .withColumn('end_year', split(df_research_main['endDate'], '-').getItem(1))

In [14]:
#merge all org research df's

df_merge1 = research_2007_2013_org.unionAll(research_2014_2020_org)
df_research_org = df_merge1.unionAll(research_2021_2027_org)

In [15]:
#create new column activityType_name on research org

df_research_org = df_research_org.withColumn("activityType_name", expr("CASE WHEN activityType = 'HES' THEN 'Education Establishments' " + 
               "WHEN activityType = 'REC' THEN 'Research Organisations' WHEN activityType = 'PRC' THEN 'Private sector' \
               WHEN activityType = 'PRC' THEN 'Private sector' \
               WHEN activityType = 'PUB' THEN 'Public bodies' \
               WHEN activityType = 'OTH' THEN 'Other' \
               WHEN activityType IS NULL THEN 'Not_considered'" +\
               "ELSE activityType END"))

In [16]:
#Split column geolocation on research org

df_research_org = df_research_org.withColumn('latitude', split(df_research_org['geolocation'], ',').getItem(0)) \
       .withColumn('longitude', split(df_research_org['geolocation'], ',').getItem(1))

In [17]:
#merge all themes research df's

#check column names
print(result[2].schema.names)
print(result[5].schema.names)
print(result[8].schema.names)
#ok to merge

['_id', 'euroSciVocCode', 'euroSciVocPath', 'euroSciVocTitle', 'projectID']
['_id', 'euroSciVocCode', 'euroSciVocPath', 'euroSciVocTitle', 'projectID']
['_id', 'euroSciVocCode', 'euroSciVocPath', 'euroSciVocTitle', 'projectID']


In [18]:
df_merge2 = result[2].unionAll(result[5])
df_research_themes = df_merge2.unionAll(result[8])

In [19]:
#Split column path in THEMES research dataframes
cols_to_drop = ('to_drop', 'sub_sub_path')


df_research_themes = df_research_themes.withColumn('to_drop', split(df_research_themes['euroSciVocPath'], '/').getItem(0)) \
        .withColumn('main_path', split(df_research_themes['euroSciVocPath'], '/').getItem(1)) \
        .withColumn('sub_path', split(df_research_themes['euroSciVocPath'], '/').getItem(2)) \
        .withColumn('sub_sub_path', split(df_research_themes['euroSciVocPath'], '/').getItem(3)) \
        .drop(*cols_to_drop)

In [20]:
#merge all dfs
df_research_main_org = df_research_org.join(df_research_main,df_research_org["projectID"] == df_research_main["id"])

In [21]:
df_research_all = df_research_main_org.join(df_research_themes,df_research_main_org["projectID"] == df_research_themes["projectID"]).drop('_id', 'projectID', 'rcn')

In [22]:
# df_time_sum = (df_OCDE_ind.groupby('TIME')\
# .agg(sum('Value')\
# .alias('sum'))\
# .sort(col('TIME')\
# .desc()))

# df_time_sum.show()

In [23]:
df_research_all.schema.names

['activityType',
 'city',
 'country',
 'ecContribution',
 'geolocation',
 'name',
 'order',
 'organisationID',
 'role',
 'shortName',
 'activityType_name',
 'latitude',
 'longitude',
 'ecMaxContribution',
 'endDate',
 'frameworkProgramme',
 'id',
 'objective',
 'startDate',
 'title',
 'totalCost',
 'start_year',
 'end_year',
 'euroSciVocCode',
 'euroSciVocPath',
 'euroSciVocTitle',
 'main_path',
 'sub_path']

In [24]:
#drop rows where nans

print('Before drop start_year: ', df_research_all.count())
df_research_all = df_research_all.na.drop(subset=['start_year'])
print('After drop start_year: ', df_research_all.count())

df_research_all = df_research_all.na.drop(subset=['country'])
print('After drop country: ', df_research_all.count())


Before drop start_year:  1057621
After drop start_year:  1057216
After drop country:  1057202


#### Trying ways to optimize speed during pyspark queries in a 1M row df

In [25]:
#default parameters
#https://www.projectpro.io/recipes/explain-repartition-and-coalesce-functions-pyspark-databricks
# import time
# start_time = time.time()
# (df_research_all.groupby('country')
# .agg(count('id')
# .alias('count'))
# .sort(col('count')
# .desc())
# .show())
# print(f"Execution time: {time.time() - start_time}")

# Execution time: 73.11822366714478

In [26]:
# #trying cache
# df_research_all.cache()

# start_time = time.time()
# (df_research_all.groupby('country')
# .agg(count('id')
# .alias('count'))
# .sort(col('count')
# .desc())
# .show())
# print(f"Execution time: {time.time() - start_time}")

In [27]:
# #trying partitions and coalesce
# print(df_research_all.rdd.getNumPartitions())


# spark.conf.set("spark.sql.shuffle.partitions", "500")

In [28]:
# Rdd = spark.sparkContext.parallelize((0,20))
# print("From local[5]"+str(Rdd.getNumPartitions()))

# Rdd1 = spark.sparkContext.parallelize((0,25), 6)
# print("parallelize : "+str(Rdd1.getNumPartitions()))
# # Rdd1.saveAsTextFile("/FileStore/tables/partition22")

# # Using repartition() function
# Rdd2 = Rdd1.repartition(5)
# print("Repartition size : " + str(Rdd2.getNumPartitions()))
# # Rdd2.saveAsTextFile("/FileStore/tables/re-partition22")

# # Using coalesce() function
# Rdd3 = Rdd1.coalesce(5)
# print("Repartition size : " + str(Rdd3.getNumPartitions()))
# # Rdd3.saveAsTextFile("/FileStore/tables/coalesce22")

In [29]:
# start_time = time.time()
# (df_research_all.groupby('country')
# .agg(count('id')
# .alias('count'))
# .sort(col('count')
# .desc())
# .show())
# print(f"Execution time: {time.time() - start_time}")

# Execution time: 15.6615395475815

#### finished optimization test

In [30]:
#create column with country_name from country

country_codes = spark.read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri", "mongodb://localhost:27017/Project6") \
    .option("database", "Project6") \
    .option("collection", 'country_codes') \
    .load()

In [31]:
country_codes.schema.names

['_id',
 'alpha-2',
 'alpha-3',
 'country-code',
 'intermediate-region',
 'intermediate-region-code',
 'iso_3166-2',
 'name',
 'region',
 'region-code',
 'sub-region',
 'sub-region-code']

In [32]:
country_codes = country_codes.drop("country-code")

In [33]:
country_codes = country_codes.withColumnRenamed("name", "country_name")

In [34]:
country_codes.schema.names

['_id',
 'alpha-2',
 'alpha-3',
 'intermediate-region',
 'intermediate-region-code',
 'iso_3166-2',
 'country_name',
 'region',
 'region-code',
 'sub-region',
 'sub-region-code']

In [35]:
df_research_all = df_research_all.join\
    (country_codes, df_research_all["country"] == country_codes["alpha-2"])\
    .drop('_id', 'alpha-3', "alpha-2", 'intermediate-region',\
     'intermediate-region-code',\
     'iso_3166-2',\
     'region',\
     'region-code',\
     'sub-region',\
     'sub-region-code',
     'country-code', 'name')

In [36]:
df_research_all.schema.names

['activityType',
 'city',
 'country',
 'ecContribution',
 'geolocation',
 'order',
 'organisationID',
 'role',
 'shortName',
 'activityType_name',
 'latitude',
 'longitude',
 'ecMaxContribution',
 'endDate',
 'frameworkProgramme',
 'id',
 'objective',
 'startDate',
 'title',
 'totalCost',
 'start_year',
 'end_year',
 'euroSciVocCode',
 'euroSciVocPath',
 'euroSciVocTitle',
 'main_path',
 'sub_path',
 'country_name']

In [38]:
(df_research_all.groupby('country_name')
.agg(count('id')
.alias('count'))
.sort(col('count')
.desc())
.show())

+--------------------+------+
|        country_name| count|
+--------------------+------+
|             Germany|132231|
|               Spain|107646|
|              France|102638|
|               Italy| 99160|
|         Netherlands| 67784|
|             Belgium| 47666|
|         Switzerland| 33082|
|              Sweden| 32324|
|             Austria| 28667|
|             Denmark| 24586|
|            Portugal| 21615|
|             Finland| 21610|
|              Norway| 18372|
|             Ireland| 17142|
|              Poland| 15834|
|              Israel| 13190|
|             Czechia| 11133|
|United States of ...|  9694|
|             Hungary|  9445|
|             Romania|  8173|
+--------------------+------+
only showing top 20 rows



In [78]:
#transform OCDE dataset
OCDE_ind= OCDE_ind.drop('_id', 'IND', "VAR", 'LOCATION')

## Load

In [None]:
# df_research_all.write \
#   .format("jdbc") \
#   .option("driver","com.mysql.cj.jdbc.Driver") \
#   .option("url", "jdbc:mysql://localhost:3306/Project6") \
#   .option("dbtable", "df_research_all") \
#   .option("user", "root") \
#   .option("password", "M1a2r3t4a5!") \
#   .mode('overwrite') \
#   .save()

In [39]:
df_research_all.write \
  .format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/Project6") \
  .option("dbtable", "df_research_all") \
  .option("user", "root") \
  .option("password", "M1a2r3t4a5!") \
  .save()

In [80]:
OCDE_ind.write \
  .format("jdbc") \
  .option("driver","com.mysql.cj.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/Project6") \
  .option("dbtable", "OCDE_ind") \
  .option("user", "root") \
  .option("password", "M1a2r3t4a5!") \
  .mode('overwrite').save()