In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import os

In [None]:
load_dotenv()

In [None]:
DATASET = os.getenv('DATASET')
NUMBER_SAMPLES = 100000

In [None]:
# Reading the schema and data for the the templates
df_schema = spark.read.parquet(DATASET)
df_parser = spark.read.schema(df_schema.schema).parquet (DATASET)
print(df_parser.count())
display(df_parser)

In [None]:
# Functions to extract the last values
def get_last_organization_name(organizations):
    """"
    Extracts the name of the last organization from a list of organization dictionaries.
    :param organizations: List of dictionaries representing organizations.
    :return: The name of the last organization in the list, or None if the list is empty.
    """
    return organizations[-1]['name'] if organizations else None

def get_last_phone(phones):
    """"
    Extracts the last phone number from a list of phone dictionaries.
    :param phones: LiST oT dictionaries represencing phone numberS.
    :return: The last phone number in the list, or None if the list is empty.
    """
    return phones [-1] ['number'] if phones else None

def get_last_location(locations):
    """
    Extracts the last raw location from a list of location dictionaries.
    :param locations: List of dictionaries representing locations. 
    :return: The last raw location in the list, or None if the list is empty.
    """
    return locations [-1][' raw_location'] if locations else None

def get_last_email(emails):
    """
    Extracts the last email address from a list of email dictionaries.
    :param emails: List of dictionaries representing email addresses. 
    :return: The last email address in the list, or None if the list is empty.
    """
    return emails[-1]['address'] if emails else None

#A PySpark UDF is created by passing a custom function and its return type. when applied to a DataFrame, It processes the input column using the custom function and generates a new column with the specified data type.
get_last_organization_name_udf = udf(get_last_organization_name, StringType ( ))
get_last_phone_udf = udf(get_last_phone, StringType())
get_last_location_udf = udf(get_last_location, StringType())
get_last_email_udf = udf(get_last_email, StringType())

In [None]:
#Creates a new DataFrame named new_df by selecting and transforming specific columns from the df_parser DataFrame using PySpark UDFs and column operations. The new DataFrame includes the following columns: 'name', 'phone', 'location', 'job_title', 'organization', and 'email'.
new_df = df_parser.select(col("'name.raw").alias("name"), get_last_phone_udf("phones").alias("phone"), get_last_location_udf(col("locations")).alias("location"), col("job_title.raw").alias("job_title"), get_last_organization_name_udf(col("organizations")).alias("organization"), get_last_email_udf(col('emails')).alias ("email"))

print (new_df.count())
display(new_df)

In [None]:
#Create a new DataFrame called filtered_df by removing rows with any null or missing values from the new_df DataFrame using the dropna() method.
filtered_df = new_df.dropna()

print(filtered_df.count())
display(filtered_df)

In [None]:
#Calculates the sampling fraction based on the desired number of samples (NUMBER_SAMPLES) divided by the total number of rows in the filtered_df DataFrame. It then creates a new DataFrame called sampled_df by performing a random sampling of the rows in the filtered_df DataFrame without replacement, using the calculated sampling fraction.
fraction = NUMBER_SAMPLES / filtered_df.count()
sampled_df = filtered_df.sample(withReplacement=False, fraction=fraction)

print (sampled_df.count())
display(sampled_df)

In [None]:
#Writes the sampled_df DataFrame to a specified path in the Parquet file format, using the 'overwrite' mode to replace any existing data at the destination. The resulting Parquet file can be used for efficient storage and further processing of the sampled data.
sampled_df write mode('overwrite').parquet('ner_research/sample')