***Import the necessary libraries***

In [54]:
#Import the necessary libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, concat_ws, row_number, desc
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

In [55]:
#Import the necessary libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col, concat_ws, row_number, desc
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

In [56]:
print("test")

test


***Initialize the the connection from the Spark to the postgresql database***

In [57]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName('postgresql_connection') \
    .getOrCreate()


In [58]:
# PostgreSQL connection parameters
database = "skyminyr_development"
user = "postgres"
password = "password"
url = f"jdbc:postgresql://global-db:5432/{database}"

In [59]:
# Read table names from PostgreSQL metadata
table_names = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "(SELECT table_name FROM information_schema.tables WHERE table_schema = 'public') as tables") \
    .load()

In [60]:


# Convert the DataFrame to a list of table names
table_list = table_names.select("table_name").rdd.flatMap(lambda x: x).collect()

# Print the list of table names
print("Available Tables:")
for table_name in table_list:
    print(table_name)

# Now you can use this list to access each table individually

Available Tables:
ar_internal_metadata
companies
company_annual_revenues
company_customers
company_events
company_funding_round_investors
company_funding_rounds
company_headcounts
company_locations
company_names
company_sectors
company_social_urls
company_stock_tickers
people
person_customers
person_educations
person_employments
person_social_urls
schema_migrations


In [61]:
# Function to load data from a table into a DataFrame
def load_table(table_name):
    df = spark.read \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "org.postgresql.Driver") \
        .load()
    return df

***Load the table into the dataframes***

In [62]:
# Load data from the people related tables into a DataFrame
people_df = load_table("people")
person_customer_df = load_table("person_customers")
person_educations_df = load_table("person_educations")
person_employments_df = load_table("person_employments")
person_social_urls_df = load_table("person_social_urls")
company_sectors_df = load_table("company_sectors")
company_headcounts_df = load_table("company_headcounts")
company_annual_revenues_df = load_table("company_annual_revenues")
company_stock_tickers_df = load_table("company_stock_tickers")
company_funding_rounds_df = load_table("company_funding_rounds")

***Function to check the uniqueness of data***

In [63]:
#function to check the uniqueness of the dataframe
def check_uniqueness(df, column_name):
    # Add a new column 'is_duplicate' that flags if column_name is duplicated
    df_duplicates_check = df.withColumn('is_duplicate', count(column_name).over(Window.partitionBy(column_name)) > 1)

    # If any 'is_duplicate' is True, then DataFrame is not unique based on column_name
    if df_duplicates_check.filter(col('is_duplicate')).count() > 0:
        print(f"DataFrame is not unique based on {column_name}")
    else:
        print(f"DataFrame is unique based on {column_name}")

In [64]:
check_uniqueness(people_df, 'id')

DataFrame is unique based on id


***Function to check the counts of data***

In [65]:
def count_records(df):
    """
    This function counts the number of records in a DataFrame.

    Parameters:
    df (DataFrame): The DataFrame for which to count the records.

    Returns:
    int: The number of records in the DataFrame.
    """
    # Use the count() function to count the number of rows in the DataFrame
    num_records = df.count()

    # Return the number of records
    return num_records

***People Table Transformation***

In [66]:
# Select specific columns from the 'people_df' DataFrame, rename the 'id' column to 'person_id',
# and concatenate 'address', 'city', 'region', 'postal_code', 'country' columns with a comma separator
people_selected_df = people_df.select('id', 'name', 'address', 'city', 'region', 'postal_code', 'country', 'headline','description', 'created_at', 'updated_at')\
                              .withColumnRenamed('id', 'person_id')\
                              .withColumn('address', concat_ws(', ', 'address', 'city', 'region', 'postal_code', 'country'))

people_selected_df = people_selected_df.select('person_id', 'name', 'address','headline','description', 'created_at', 'updated_at')

# people_selected_df.show()

In [67]:
count_records(people_selected_df)

999

***Transformation for the "person_customer" table***

In [68]:
count_records(person_customer_df)

# person_customer_df.show()

166

In [69]:
# Call the 'check_uniqueness' function on 'person_customer_df' DataFrame to check if 'person_id' is unique
check_uniqueness(person_customer_df,'person_id')

DataFrame is not unique based on person_id


In [70]:
# Group by 'person_id' and count the number of occurrences of each 'person_id'
duplicate_rows = person_customer_df.groupBy('person_id').agg(count('*').alias('count'))

# Filter the rows where 'count' is greater than 1 (i.e., 'person_id' is duplicated)
duplicate_rows = duplicate_rows.filter(duplicate_rows['count'] > 1)

# Show the duplicate rows
duplicate_rows.show()
count_records(duplicate_rows)

+---------+-----+
|person_id|count|
+---------+-----+
|693540132|    2|
+---------+-----+



1

In [71]:
# Define a window partitioned by person_id and ordered by updated_at in descending order
window = Window.partitionBy("person_id").orderBy(desc("updated_at"))

# Add a row_number column to the DataFrame
person_customer_df = person_customer_df.withColumn("rn", row_number().over(window))

# Filter the DataFrame to keep only the rows with rn = 1 (i.e., the latest updated_at for each person_id)
person_customer_df = person_customer_df.filter(person_customer_df.rn == 1)

# Drop the rn column
person_customer_df = person_customer_df.drop("rn")

count_records(person_customer_df)

165

In [72]:
# Call the 'check_uniqueness' function on 'person_customer_df' DataFrame to check if 'person_id' is unique
check_uniqueness(person_customer_df,'person_id')

DataFrame is unique based on person_id


In [73]:
# Select 'person_id' and 'customer_id' columns from 'person_customer_df' DataFrame
people_selected_customer_df = person_customer_df.select('person_id', 'customer_id')

# Join 'people_selected_df' with 'people_selected_customer_df' on 'person_id'
people_person_customer_df = people_selected_df.join(people_selected_customer_df, on='person_id', how='left')

# people_person_customer_df.show()

In [74]:
count_records(people_person_customer_df)

999

***Transformation for "Person Education" table***

In [75]:
count_records(person_educations_df)
# person_educations_df.show()

1209

In [76]:
# Define a struct for each institute
institute_struct = F.struct(
    "institution_id", "institution_name", "degree", "subject", "started_on", "ended_on"
)

# Group by person_id and collect list of institute details as structs
grouped_educations_df = person_educations_df.groupBy("person_id").agg(
    F.collect_list(institute_struct).alias("education")
    # ("education - (institution_id, institution_name, degree, subject, started_on, ended_on)")
)

person_education_group_df = grouped_educations_df



# Show the result
# person_education_group_df.show(truncate=False)


In [77]:
check_uniqueness(person_education_group_df, 'person_id')

DataFrame is unique based on person_id


In [78]:
count_records(person_education_group_df)

500

In [79]:
# Joining person_education_group_df and people_person_customer_df DataFrames on column named 'person_id'

people_person_customer_education_df = people_person_customer_df.join(person_education_group_df, on='person_id', how='left') 

# people_person_customer_education_df.show()

count_records(people_person_customer_education_df)

999

***Transformation for Company related information***

In [80]:
count_records(company_sectors_df)

5711

In [81]:
# Group the DataFrame 'company_sectors_df' by 'company_id'
grouped_company_sectors_df = company_sectors_df.groupBy('company_id').agg(
    F.collect_list('sector').alias('sectors')
)

# Check the DataFrame 'grouped_company_sectors_df'
# grouped_company_sectors_df.show()

count_records(grouped_company_sectors_df)

1000

In [82]:
count_records(company_annual_revenues_df)

18532

In [83]:
# Define a window partitioned by 'company_id' and ordered by 'date' in descending order
window = Window.partitionBy('company_id').orderBy(F.desc('date'))

# Add a new column 'rank' to the DataFrame 'company_annual_revenues_df'
# The 'rank' is calculated over the defined window
grouped_company_annual_revenues_df = company_annual_revenues_df.withColumn('rank', F.rank().over(window))

# Filter the DataFrame to keep only the rows where 'rank' is 1
# This gives us the latest annual revenue for each company
# Select only the 'company_id' and 'amount_usd' columns for the final DataFrame
latest_company_annual_revenues_df = grouped_company_annual_revenues_df.filter(F.col('rank') == 1).select('company_id', 'amount_usd')

# Check the DataFrame 'latest_company_annual_revenues_df'
# latest_company_annual_revenues_df.show()

count_records(latest_company_annual_revenues_df)

999

In [84]:
count_records(company_headcounts_df)

1000

In [85]:
# Define a window partitioned by 'headcount' and ordered by 'date' in descending order
window = Window.partitionBy('headcount').orderBy(F.desc('date'))

# Add a new column 'rank' to the DataFrame 'company_headcounts_df'
# The 'rank' is calculated over the defined window
grouped_company_headcounts_df = company_headcounts_df.withColumn('rank', F.rank().over(window))

# Filter the DataFrame to keep only the rows where 'rank' is 1
# This gives us the latest headcount for each company
# Select only the 'company_id' and 'headcount' columns for the final DataFrame
latest_company_headcounts_df = grouped_company_headcounts_df.filter(F.col('rank') == 1).select('company_id', 'headcount')

# Check the DataFrame 'latest_company_headcounts_df'
# latest_company_headcounts_df.show()

count_records(latest_company_headcounts_df)

847

In [86]:
# Select the 'company_id' and 'stock_ticker' columns from the DataFrame 'company_stock_tickers_df'
selected_company_stock_tickers_df = company_stock_tickers_df.select('company_id','stock_ticker')
count_records(selected_company_stock_tickers_df)

1000

In [87]:
# Define a Window specification: partition by 'company_id' and order by 'company_id', 'date' (in descending order), and 'updated_at' (in descending order)
windowSpec = Window.partitionBy(company_funding_rounds_df['company_id']).orderBy(company_funding_rounds_df['company_id'], company_funding_rounds_df['date'].desc(), company_funding_rounds_df['updated_at'].desc())

# Add a new column 'rn' to the DataFrame 'company_funding_rounds_df' which contains row numbers within each window partition
grouped_company_funding_rounds_df = company_funding_rounds_df.withColumn("rn", row_number().over(windowSpec))

# Filter the DataFrame to keep only the rows with 'rn' equal to 1 (i.e., the most recent funding round for each company), then drop the 'rn' column
filtered_company_funding_rounds_df = grouped_company_funding_rounds_df.filter(col("rn") == 1).drop("rn")

# Select the columns 'company_id', 'name', 'investor_company_id', and 'investor' from the filtered DataFrame
filtered_company_funding_rounds_df.select('company_id','name', 'investor_company_id','investor')

DataFrame[company_id: bigint, name: string, investor_company_id: bigint, investor: string]

***Joined the company information related transformed dataframes***

In [88]:

# Join the DataFrame 'grouped_company_sectors_df' with 'latest_company_annual_revenues_df', 'latest_company_headcounts_df', and 'selected_company_stock_tickers_df' on 'company_id'
# The join type is 'left', meaning only the rows with a match in both DataFrames will be kept
company_info_joined_df = grouped_company_sectors_df.join(
    latest_company_annual_revenues_df, 'company_id', 'left'
).join(
    latest_company_headcounts_df, 'company_id', 'left'
).join(
    selected_company_stock_tickers_df, 'company_id', 'left'
).join(
    filtered_company_funding_rounds_df, 'company_id', 'left'
)

# count_records(company_info_joined_df)

# Select the 'company_id', 'amount_usd', 'headcount', and 'stock_ticker' columns from the joined DataFrame
resultant_company_info_joined_df = company_info_joined_df.select(
    'company_id',
    company_info_joined_df['name'].alias('funding_name'), 'investor_company_id','investor',
    latest_company_annual_revenues_df['amount_usd'],
    latest_company_headcounts_df['headcount'],
    selected_company_stock_tickers_df['stock_ticker']
)

# Check the DataFrame 'resultant_company_info_joined_df'
# resultant_company_info_joined_df.show()

count_records(resultant_company_info_joined_df)


1000

***Transformation for the Person Employment Table***

In [89]:
count_records(person_employments_df)
# person_employments_df.show()

7898

In [90]:
person_employments_df.columns

['id',
 'person_id',
 'company_id',
 'company_name',
 'seniority_level',
 'title',
 'started_on',
 'ended_on',
 'created_at',
 'updated_at']

In [91]:
resultant_company_info_joined_df.columns

['company_id',
 'funding_name',
 'investor_company_id',
 'investor',
 'amount_usd',
 'headcount',
 'stock_ticker']

In [108]:
# Join the DataFrames on 'company_id'
resultant_company_info_employment_joined_df = person_employments_df.join(resultant_company_info_joined_df, 'company_id', 'left')

count_records(resultant_company_info_employment_joined_df)

# Define a new struct that includes the additional columns
emplopyment_struct = F.struct(
    "company_id", "company_name", "seniority_level", "title", "started_on", "ended_on",
    resultant_company_info_joined_df['amount_usd'],
    resultant_company_info_joined_df['headcount'],
    resultant_company_info_joined_df['stock_ticker'],
    resultant_company_info_joined_df['funding_name'],
    resultant_company_info_joined_df['investor_company_id'],
    resultant_company_info_joined_df['investor']

)

# Group by 'person_id' and collect list of employment details as structs
grouped_person_employments_df = resultant_company_info_employment_joined_df.groupBy("person_id").agg(
    F.collect_list(emplopyment_struct).alias("employments")
)

grouped_person_employments_df.show(1,truncate=False)


count_records(person_employments_df)

+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

7898

In [95]:
# Join the DataFrame 'people_person_customer_education_df' with 'person_employments_df' on 'person_id'
# The resulting DataFrame contains information about a person's education and employment
people_person_customer_education_educations_df = people_person_customer_education_df.join(grouped_person_employments_df, on='person_id', how='left') 

# people_person_customer_education_educations_df.show()

count_records(people_person_customer_education_educations_df)

999

***Transformation for the Person Social Urls Table***

In [96]:
# Group by person_id and collect list of URL details
grouped_urls_df = person_social_urls_df.groupBy("person_id").agg(
    F.collect_list(
        F.struct("url_type", "url")
    ).alias("urls")
)

# Show the result
# grouped_urls_df.show(truncate=False)

count_records(grouped_urls_df)

999

In [97]:
check_uniqueness(grouped_urls_df, 'person_id')

DataFrame is unique based on person_id


In [98]:
# Joining person_education_group_df and people_person_customer_df DataFrames on column named 'person_id'

people_person_customer_education_employments_social_urls_df = people_person_customer_education_educations_df.join(grouped_urls_df, on='person_id', how='left') 

count_records(people_person_customer_education_employments_social_urls_df)


999

In [99]:
check_uniqueness(people_person_customer_education_employments_social_urls_df, 'person_id')

DataFrame is unique based on person_id


***Final Dataframe***

In [100]:
# people_person_customer_education_employments_social_urls_df.printSchema()

In [101]:
people_person_customer_education_employments_social_urls_df.show()

+----------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| person_id|               name|             address|            headline|         description|          created_at|          updated_at|         customer_id|           education|         employments|                urls|
+----------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| 949256266|        Vivian Weng|           SINGAPORE|Product Design, S...|                NULL|2023-12-11 06:18:...|                NULL|7538cb11-1c23-4c7...|[{NULL, Universit...|[{4007, McKinsey ...|[{linkedin, https...|
| 787025923|        Lynda Zhang|                    |Retail Ownership ...|                NULL|2023-12-11 06:18:

***Final Dataframe can be exported to files such as CSV, JSON, Parquet***

In [102]:
# Use the exiting dataframe
final_df = people_person_customer_education_employments_social_urls_df


In [106]:
final_df.show(1,truncate=False)

+---------+---------------+-------+----------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+----------------------+------------------------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [107]:
final_df.write.mode('overwrite').json('./work/final_df')

In [53]:
 final_df.printSchema()

root
 |-- person_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = false)
 |-- headline: string (nullable = true)
 |-- description: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- education: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- institution_id: long (nullable = true)
 |    |    |-- institution_name: string (nullable = true)
 |    |    |-- degree: string (nullable = true)
 |    |    |-- subject: string (nullable = true)
 |    |    |-- started_on: date (nullable = true)
 |    |    |-- ended_on: date (nullable = true)
 |-- employments: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- company_id: long (nullable = true)
 |    |    |-- company_name: string (nullable = true)
 |    |    |-- seniority_level: string (nullable = true)
 |    |    |-- t

In [51]:
import json

# Convert the DataFrame to a dictionary
json_dict = final_df.toJSON().collect()

# Write the dictionary to a JSON file
with open('./work/final_df/person_final_output.json', 'w') as f:
    json.dump(json_dict, f)

In [52]:
final_df.write.parquet("final_output/df4.parquet")