In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, TimestampType, DoubleType, LongType
from pyspark.sql.functions import col, from_unixtime, floor

In [2]:
credentials_location = '/home/xiangivyli/.gc/google_credential_spark.json'

# First, stop the existing Spark session if it's running
if 'spark' in locals():
    spark.stop()


conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('RepartitionApp') \
    .set("spark.jars", "/home/xiangivyli/lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [3]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

24/03/30 10:08:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/30 10:08:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/03/30 10:08:56 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [4]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

## Read parquet files into dataframes

In [5]:
# Define the Spark schema
job_postings_schema = StructType([
    StructField("job_id", StringType(), True),
    StructField("company_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("max_salary", FloatType(), True),
    StructField("med_salary", FloatType(), True),
    StructField("min_salary", FloatType(), True),
    StructField("pay_period", StringType(), True),
    StructField("formatted_work_type", StringType(), True),
    StructField("location", StringType(), True),
    StructField("applies", IntegerType(), True),
    StructField("original_listed_time", LongType(), True),
    StructField("remote_allowed", StringType(), True),
    StructField("views", IntegerType(), True),
    StructField("job_posting_url", StringType(), True),
    StructField("application_url", StringType(), True),
    StructField("application_type", StringType(), True),
    StructField("expiry", LongType(), True),
    StructField("closed_time", LongType(), True),
    StructField("formatted_experience_level", StringType(), True),
    StructField("skills_desc", StringType(), True),
    StructField("listed_time", LongType(), True),
    StructField("posting_domain", StringType(), True),
    StructField("sponsored", IntegerType(), True),
    StructField("work_type", StringType(), True),
    StructField("currency", StringType(), True),
    StructField("compensation_type", StringType(), True),
    StructField("scraped", IntegerType(), True)
])

In [6]:
df_posting = spark.read \
    .option("header", "true") \
    .option("escape", "\"") \
    .option("multiline", "true") \
    .schema(job_postings_schema) \
    .csv("gs://de-zoomcamp-xiangivyli/final_project/raw/job_postings.csv")

In [7]:
df_posting.show(5)

24/03/30 10:09:58 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 0:>                                                          (0 + 1) / 1]

+----------+----------+--------------------+--------------------+----------+----------+----------+----------+-------------------+----------------+-------+--------------------+--------------+-----+--------------------+--------------------+----------------+------+-----------+--------------------------+--------------------+-----------+--------------------+---------+---------+--------+-----------------+----------+
|    job_id|company_id|               title|         description|max_salary|med_salary|min_salary|pay_period|formatted_work_type|        location|applies|original_listed_time|remote_allowed|views|     job_posting_url|     application_url|application_type|expiry|closed_time|formatted_experience_level|         skills_desc|listed_time|      posting_domain|sponsored|work_type|currency|compensation_type|   scraped|
+----------+----------+--------------------+--------------------+----------+----------+----------+----------+-------------------+----------------+-------+------------------

                                                                                

In [8]:
# Define a list of your timestamp columns
timestamp_columns = ["original_listed_time", "expiry", "closed_time", "listed_time"]

# Convert from Unix time in milliseconds to a proper timestamp
# Loop through the list and apply the transformation to each column
for column_name in timestamp_columns:
    df_posting = df_posting.withColumn(
        column_name,
        (col(column_name) / 1000).cast("timestamp")
    )

In [10]:
df_posting.repartition(10).write.parquet("gs://de-zoomcamp-xiangivyli/final_project/pq-linkedin-job-postings/job_posting_pq/", mode="overwrite")

                                                                                