In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
spark = SparkSession.builder\
  .master("local").config('spark.executor.instances', 8)\
  .appName("Data sicnece salary analysis")\
  .getOrCreate()

storage_account_name = "*****"

# Azure Storage Account Key
storage_account_key = ""

# Azure Storage Account Source Container
container = "data"

# Set the configuration details to read/write
spark.conf.set("fs.azure.account.key.{0}.blob.core.windows.net".format(storage_account_name), storage_account_key)
     

In [0]:
file_location = "wasb://data@rajdep561.blob.core.windows.net/ds_salaries.csv"
df = spark.read.format('csv').option('header','true').load(file_location)
df.show()

+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|     2023|              SE|             FT|Principal Data Sc...| 80000|            EUR|        85847|                ES|         100|              ES|           L|
|     2023|              MI|             CT|         ML Engineer| 30000|            USD|        30000|                US|         100|              US|           S|
|     2023|              MI|             CT|         ML Engineer| 25500|            USD|        25500|                US|         100|              US|           S|
|     2023

In [0]:
df = df.withColumn('salary',df['salary'].cast('int'))\
.withColumn('salary_in_usd',df['salary_in_usd'].cast('int'))\
.withColumn('remote_ratio',df['remote_ratio'].cast('int'))
df.show()

+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|     2023|              SE|             FT|Principal Data Sc...| 80000|            EUR|        85847|                ES|         100|              ES|           L|
|     2023|              MI|             CT|         ML Engineer| 30000|            USD|        30000|                US|         100|              US|           S|
|     2023|              MI|             CT|         ML Engineer| 25500|            USD|        25500|                US|         100|              US|           S|
|     2023

In [0]:
df = df.na.fill(value='-',subset=["work_year","experience_level","employment_type"\
                                ,"job_title","salary_currency","employee_residence","company_location","company_size"])
df.show()

+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|     2023|              SE|             FT|Principal Data Sc...| 80000|            EUR|        85847|                ES|         100|              ES|           L|
|     2023|              MI|             CT|         ML Engineer| 30000|            USD|        30000|                US|         100|              US|           S|
|     2023|              MI|             CT|         ML Engineer| 25500|            USD|        25500|                US|         100|              US|           S|
|     2023

In [0]:
experiences = df.select('experience_level').distinct().toPandas().values
exper_d = {}
for i,k in enumerate(experiences):
    exper_d[k[0]] =i
exper_udf = udf(lambda x:exper_d[x])


employment_type = df.select('employment_type').distinct().toPandas().values
emp_type = {}
for i,k in enumerate(employment_type):
    emp_type[k[0]] =i
emp_udf = udf(lambda x:emp_type[x])


job_title = df.select('job_title').distinct().toPandas().values
jb_title = {}
for i,k in enumerate(job_title):
    jb_title[k[0]] =i
jb_udf = udf(lambda x:jb_title[x])
    

salary_curr = df.select('salary_currency').distinct().toPandas().values
sal_cur = {}
for i,k in enumerate(salary_curr):
    sal_cur[k[0]]=i
sal_udf = udf(lambda x:sal_cur[x])
    
    
location = df.select('company_location').distinct().toPandas().values
cmp_local = {}
for i,k in enumerate(location):
    cmp_local[k[0]] = i
local_udf = udf(lambda x: cmp_local[x])


emp_location = df.select('employee_residence').distinct().toPandas().values
emp_local = {}
for i,k in enumerate(emp_location):
    emp_local[k[0]] = i
emp_local_udf = udf(lambda x: emp_local[x])


In [0]:
df = df.withColumn('experience_level_id',exper_udf(col('experience_level')))
exp_level = df.select('experience_level_id','experience_level').distinct()
exp_level.show()

+-------------------+----------------+
|experience_level_id|experience_level|
+-------------------+----------------+
|                  2|              EN|
|                  1|              MI|
|                  3|              SE|
|                  0|              EX|
+-------------------+----------------+



In [0]:
df = df.withColumn('employment_type_id',emp_udf(col('employment_type')))
emp_type = df.select('employment_type_id','employment_type').distinct()
emp_type.show()

+------------------+---------------+
|employment_type_id|employment_type|
+------------------+---------------+
|                 1|             PT|
|                 0|             FT|
|                 3|             FL|
|                 2|             CT|
+------------------+---------------+



In [0]:
df = df.withColumn('job_title_id',jb_udf(col('job_title')))
jb_tl = df.select('job_title_id','job_title').distinct()
jb_tl.show()

+------------+--------------------+
|job_title_id|           job_title|
+------------+--------------------+
|          33|Business Intellig...|
|          35|Applied Machine L...|
|          72|        AI Developer|
|          61|  Analytics Engineer|
|           2|        Data Modeler|
|          62|   Research Engineer|
|          23|Principal Data Sc...|
|          68|Data Analytics Ma...|
|          60|   Applied Scientist|
|          75|     Data Strategist|
|          46|      Data Scientist|
|          28|Machine Learning ...|
|          83|      Data Architect|
|          65|Business Data Ana...|
|          74|Data Quality Analyst|
|          92|Compliance Data A...|
|          39|  Research Scientist|
|          71|Computer Vision E...|
|          90|       Data Engineer|
|          58|         ML Engineer|
+------------+--------------------+
only showing top 20 rows



In [0]:
df = df.withColumn('salary_currency_id',sal_udf(col('salary_currency')))
sal_currency = df.select('salary_currency_id','salary_currency').distinct()
sal_currency.show()

+------------------+---------------+
|salary_currency_id|salary_currency|
+------------------+---------------+
|                 4|            BRL|
|                18|            USD|
|                15|            CLP|
|                 1|            HUF|
|                12|            JPY|
|                 5|            CZK|
|                 2|            GBP|
|                19|            SGD|
|                17|            PLN|
|                 3|            CHF|
|                 8|            EUR|
|                13|            HKD|
|                10|            ILS|
|                11|            AUD|
|                 6|            TRY|
|                16|            MXN|
|                14|            INR|
|                 0|            DKK|
|                 9|            THB|
|                 7|            CAD|
+------------------+---------------+



In [0]:
df = df.withColumn('company_location_id',local_udf(col('company_location')))
company_location = df.select('company_location_id','company_location').distinct()
company_location.show()

+-------------------+----------------+
|company_location_id|company_location|
+-------------------+----------------+
|                 65|              NG|
|                  2|              FI|
|                 66|              CF|
|                 19|              GH|
|                 31|              DE|
|                 43|              CH|
|                 34|              IL|
|                 59|              IE|
|                 40|              IN|
|                  5|              NL|
|                 20|              HK|
|                 48|              SE|
|                 42|              FR|
|                 64|              CO|
|                 26|              GB|
|                 23|              AU|
|                 37|              US|
|                 25|              CA|
|                  3|              UA|
|                 32|              ES|
+-------------------+----------------+
only showing top 20 rows



In [0]:
df = df.withColumn('employee_residence_id',emp_local_udf(col('employee_residence')))
emp_residence = df.select('employee_residence_id','employee_residence').distinct()
emp_residence.show()

+---------------------+------------------+
|employee_residence_id|employee_residence|
+---------------------+------------------+
|                    2|                FI|
|                   18|                GH|
|                   31|                DE|
|                   72|                CF|
|                   12|                AT|
|                   43|                CH|
|                   34|                IL|
|                   40|                IN|
|                    5|                NL|
|                   42|                FR|
|                   26|                GB|
|                   19|                HK|
|                   69|                CO|
|                   24|                CA|
|                   37|                US|
|                   22|                AU|
|                    3|                UA|
|                   70|                NG|
|                   63|                IE|
|                   32|                ES|
+----------

In [0]:
cols = ("employee_residence","company_location","salary_currency",'job_title','employment_type','experience_level')

df = df.drop(*cols)

In [0]:
url_jdbc = f"jdbc:sqlserver://rajdep561.database.windows.net:1433;database=eda;user=;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
print(url_jdbc)

jdbc:sqlserver://rajdep561.database.windows.net:1433;database=eda;user=rajdep561@rajdep561;password=1@B3ngal1;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;


In [0]:
company_location.write\
    .mode("overwrite").format("jdbc")\
    .option("url", url_jdbc)\
    .option("dbtable", "company_location").save()


emp_residence.write\
    .mode("overwrite").format("jdbc")\
    .option("url", url_jdbc)\
    .option("dbtable", "employee_residence").save()


sal_currency.write\
    .mode("overwrite").format("jdbc")\
    .option("url", url_jdbc)\
    .option("dbtable", "salary_currency").save()

jb_tl.write\
    .mode("overwrite").format("jdbc")\
    .option("url", url_jdbc)\
    .option("dbtable", "job_title").save()


emp_type.write\
    .mode("overwrite").format("jdbc")\
    .option("url", url_jdbc)\
    .option("dbtable", "employment_type").save()

exp_level.write\
    .mode("overwrite").format("jdbc")\
    .option("url", url_jdbc)\
    .option("dbtable", "experience_level").save()


df.write\
    .mode("overwrite").format("jdbc")\
    .option("url", url_jdbc)\
    .option("dbtable", "fact_ds").save()
