In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("mbd-prj").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/07 14:46:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Reading the datasets

In [3]:
from pathlib import Path
lymsys_folder = Path("/Users/lgk1910/Library/CloudStorage/OneDrive-AaltoUniversity/Learning2/Big Data/Project/data/lmsys-chat-1m")
wildchat_folder = Path("/Users/lgk1910/Library/CloudStorage/OneDrive-AaltoUniversity/Learning2/Big Data/Project/data/WildChat-1M-Full/WildChat-1M-Full/")

In [4]:
# test reading the dataset from a folder of parquet files
lymsys_df = spark.read.parquet(str(lymsys_folder))
lymsys_df.show(5)
wildchat_df = spark.read.parquet(str(wildchat_folder))
wildchat_df.show(5)


+--------------------+----------+-----------------------------------+----+----------+--------------------+--------+
|     conversation_id|     model|                       conversation|turn|  language|   openai_moderation|redacted|
+--------------------+----------+-----------------------------------+----+----------+--------------------+--------+
|c20948192699451b8...|vicuna-13b|               [{Given the artic...|   1|   English|[{{false, false, ...|    true|
|8b02aa2df44847a1b...|vicuna-33b|               [{Alright, let's ...|   2|   English|[{{false, false, ...|    true|
|c18f3612fac140cb9...|vicuna-13b|[{あなたが最終更新されたのはいつ...|   5|  Japanese|[{{false, false, ...|   false|
|d1fabb62e3364665a...| llama-13b|               [{You are an AI a...|   1|   English|[{{false, false, ...|    true|
|b2da335248f04e439...| koala-13b|               [{Reescreva esse ...|   1|Portuguese|[{{false, false, ...|   false|
+--------------------+----------+-----------------------------------+----+----------+--

# Sample codes

## Read country - state timezone and join with WildChat dataset

### Install dependency

In [5]:
!pip install timezonefinder geopy pytz


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


### Sample code

In [6]:
# open the parquet file
country_state_df = spark.read.parquet("country_state_timezone.parquet")
country_state_df.count() # 2433

2433

In [7]:
# get the first row of the dataframe
first_row = country_state_df.first()
# print the gmt_offset of the first row
print(first_row.gmt_offset)

Row(hours=5, minutes=30)


In [8]:
# join condition to handle cases when 'state' is null
wildchat_df = wildchat_df.join(
    country_state_df,
    (
        (wildchat_df["country"] == country_state_df["ccountry"]) &
        (
            (wildchat_df["state"] == country_state_df["sstate"]) |
            (wildchat_df["state"].isNull() & country_state_df["sstate"].isNull())
        )
    ),
    "left"
)


# Select the required columns and filter to show rows where 'timezone' is null, supposed to be 1262 (only a small fraction)
wildchat_df.select("country", "state", "timezone").filter(F.col("timezone").isNull()).count() # 1262


                                                                                

1262

In [9]:
# drop ccountry and sstate columns
wildchat_df = wildchat_df.drop("ccountry", "sstate")
wildchat_df.columns

['conversation_hash',
 'model',
 'timestamp',
 'conversation',
 'turn',
 'language',
 'openai_moderation',
 'detoxify_moderation',
 'toxic',
 'redacted',
 'state',
 'country',
 'hashed_ip',
 'header',
 'timezone',
 'gmt_offset']

In [10]:
# Shift the timestamp by adding the GMT offset hours
wildchat_df = wildchat_df.withColumn(
	"offseted_timestamp",
	(F.unix_timestamp("timestamp") + F.col("gmt_offset.hours") * 3600 + F.col("gmt_offset.minutes") * 60).cast("timestamp")
)

wildchat_df.select("timestamp", "gmt_offset", "offseted_timestamp").show(5)

+-------------------+----------+-------------------+
|          timestamp|gmt_offset| offseted_timestamp|
+-------------------+----------+-------------------+
|2023-09-13 04:14:27|   {-5, 0}|2023-09-12 23:14:27|
|2023-09-13 04:14:58|    {8, 0}|2023-09-13 12:14:58|
|2023-09-13 04:15:23|    {1, 0}|2023-09-13 05:15:23|
|2023-09-13 04:16:17|    {9, 0}|2023-09-13 13:16:17|
|2023-09-13 04:16:35|   {-5, 0}|2023-09-12 23:16:35|
+-------------------+----------+-------------------+
only showing top 5 rows

