Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch update stream events #179

Merged
merged 14 commits into from
Jul 28, 2022
5 changes: 5 additions & 0 deletions scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ To use the data conversion script `convert_spark_dataset_to_interactive.py`, use
```bash
python3 convert_spark_dataset_to_interactive.py --input_dir '/data/out-sf1' --output_dir '/data/out-sf1/graphs/csv/bi/composite-merged-fk'
```
To convert the data and add the dependencyTime column, use the `convert_and_append.py` like:
```bash
python3 convert_and_append.py --input_dir '/data/out-sf1'
```
This will output the files into the parquet folder found under `/data/out-sf1`

## Data generation

Expand Down
139 changes: 139 additions & 0 deletions scripts/add_dependent_time_interactive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""
Script to add dependentTime columns to the update events
for interactive. The method for adding dependent time is
the same as found in the Hadoop Datagen:
https://github.com/ldbc/ldbc_snb_datagen_hadoop/blob/main/src/main/java/ldbc/snb/datagen/serializer/UpdateEventSerializer.java

Required input are the initial_snapshot parquet files together
with the inserts and deletes parquet files.
Outputs the updatestreams in the same parent folder in new folders:
inserts_dep and deletes_dep.
"""

import duckdb
import glob
import os
from constants_dependent import schema_columns, dependent_entity_map
from pathlib import Path

class DependentTimeAppender:

def __init__(self,
data_path:str,
dependent_date_column:str = "dependentDate",
default_dependent_time:int = 0
):
if (data_path[-1] == '/'):
data_path = data_path[:-1]

self.initial_snapshot_path = data_path + "/dynamic"
self.update_event_path = data_path
self.dependent_date_column = dependent_date_column
self.cursor = duckdb.connect(database='snb.duckdb')
self.default_dependent_time = default_dependent_time

def create_views(self):
print("Create temp table")
with open('./schema_dependentTime.sql') as f:
schema_def = f.read()
self.cursor.execute(schema_def)

print("Creating views")
for entity in ["Person", "Post", "Comment", "Forum"]:
self.cursor.execute(f"CREATE VIEW {entity} AS SELECT * FROM read_parquet('{self.initial_snapshot_path}/{entity}/*.parquet');")

def create_and_load_temp_tables(self):
"""
Loads the update event data into temporary tables
"""

for update_type in ['inserts', 'deletes']:
paths = glob.glob(f'{self.update_event_path}/{update_type}/*.parquet')
for path in paths:
operation_type = os.path.basename(path.removesuffix('.parquet'))
if update_type == 'deletes':
operation_type_suffix = "_Delete"
date_column = 'deletionDate'
else:
operation_type_suffix = "_Insert"
date_column = 'creationDate'
table_name = operation_type + operation_type_suffix
print("Parsing: " + table_name)

# 1. Create select list
column_string = ""
for column in schema_columns[table_name]:
column_string = column_string + column + ","
column_string = column_string[:-1] # remove last comma

# 2. Load data into temporary table
self.cursor.execute(f"INSERT INTO {table_name} SELECT {column_string} FROM read_parquet('" + path + f"') ORDER BY {date_column} ASC;")
if (table_name == "Person_Insert"):
Path(f"{self.update_event_path}/{update_type}_dep").mkdir(parents=True, exist_ok=True)
self.cursor.execute(f"COPY {table_name} TO '{self.update_event_path}/{update_type}_dep/{operation_type}.parquet' (FORMAT PARQUET);")
continue
# 3. Add dependent time for table requiring personIds
#dependent_entity_map
self.update_dependent_time(table_name, f'{self.update_event_path}/{update_type}', operation_type)


def update_dependent_time(self, table_name, output_path, operation_type):
"""
For each table:
- Fetch creationDate of the dependent columns
- Get max of those
- throw valueError when a value is not found.
- Store to parquet, drop table and continue
"""
dependent_dict = dependent_entity_map[table_name]
dependent_tables = dependent_dict["entity"]
dependent_column_ids = dependent_dict["eventColumns"]
entity_columns = dependent_dict["entityColumns"]
date_column = dependent_dict["dateColumn"]
i=1
# We iterate
select_date_columns = ""
left_join_tables = ""
for table, event_id, entity_column in zip(
dependent_tables,
dependent_column_ids,
entity_columns
):
select_date_columns += f"t{i}.{date_column}, "
left_join_tables += f"LEFT JOIN {table} AS t{i} ON t.{event_id} = t{i}.{entity_column} "
i+=1
where_clause = ""
select_clause = ""

for match_column in dependent_dict["matchColumns"]:
select_clause += f"t.{match_column} as {match_column}_match, "
where_clause += f" {match_column} = {match_column}_match AND"

where_clause = where_clause[:-4]

select_date_columns = select_date_columns[:-2] # remove space and comma
query = f"UPDATE {table_name} SET {self.dependent_date_column} = dependencyTime FROM ("
query += f"SELECT {select_clause} GREATEST({select_date_columns}) AS dependencyTime "
query += f"FROM {table_name} AS t "
query += f" {left_join_tables})"
query += f" WHERE{where_clause}"
print(query)
total_updated = self.cursor.execute(query).fetchall()
print(f"{table_name} has updated {total_updated}")
Path(f"{output_path}_dep").mkdir(parents=True, exist_ok=True)
self.cursor.execute(f"COPY {table_name} TO '{output_path}_dep/{operation_type}.parquet' (FORMAT PARQUET);")
self.cursor.execute(f"DROP TABLE {table_name};")

def execute(self, query):
return self.cursor.execute(query).fetchall()

if __name__ == "__main__":
root_data_path = '/home/gladap/repos/ldbc-data/spark/out-sf0.1/'
dta_data_path = root_data_path + 'graphs/parquet/raw/composite-merged-fk'




DTA = DependentTimeAppender(dta_data_path)
DTA.create_views()
DTA.create_and_load_temp_tables()
130 changes: 130 additions & 0 deletions scripts/constants_dependent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
schema_columns = {
"Comment_Insert":['creationDate', '0', 'id', 'locationIP', 'browserUsed', 'content', 'length', 'CreatorPersonId', 'LocationCountryId', 'ParentPostId', 'ParentCommentId', 'TagIds'],
"Forum_Insert":['creationDate', '0', 'id', 'title', 'ModeratorPersonid', 'TagIds'],
"Forum_hasMember_Person_Insert":['creationDate', '0', 'PersonId', 'ForumId'],
"Person_Insert":['creationDate', '0', 'id', 'firstName', 'lastName', 'gender', 'birthday', 'locationIP', 'browserUsed', 'LocationCityId', 'speaks', 'email', 'tagIds', 'studyAt', 'workAt'],
"Person_knows_Person_Insert":['creationDate', '0', 'Person1id', 'Person2id'],
"Person_likes_Comment_Insert":['creationDate', '0', 'PersonId', 'CommentId'],
"Person_likes_Post_Insert":['creationDate', '0', 'PersonId', 'PostId'],
"Post_Insert":['creationDate', '0', 'id', 'imageFile', 'locationIP', 'browserUsed', 'language', 'content', 'length', 'CreatorPersonId', 'ContainerForumId', 'LocationCountryId', 'TagIds'],

"Comment_Delete":['deletionDate', '0', 'id'],
"Forum_Delete":['deletionDate', '0', 'id'],
"Forum_hasMember_Person_Delete":['deletionDate', '0','ForumId', 'PersonId'],
"Person_Delete":['deletionDate', '0', 'id'],
"Person_knows_Person_Delete":['deletionDate', '0','Person1id', 'Person2id'],
"Person_likes_Comment_Delete":['deletionDate', '0', 'PersonId', 'CommentId'],
"Person_likes_Post_Delete":['deletionDate', '0', 'PersonId', 'PostId'],
"Post_Delete":['deletionDate', '0', 'id'],
}

dependent_entity_map = {
# Inserts
"Comment_Insert":{
"entity":["Person", "Post", "Forum"],
"eventColumns": ["CreatorPersonId", "ParentPostId", "ParentCommentId"],
"entityColumns":["id", "id", "id"],
"matchColumns": ["id"],
"dateColumn":"creationDate"
},
"Forum_Insert":{
"entity":["Person"],
"eventColumns": ["ModeratorPersonId"],
"entityColumns":["id"],
"matchColumns": ["id"],
"dateColumn":"creationDate"
},
"Forum_hasMember_Person_Insert":{
"entity":["Person", "Forum"],
"eventColumns": ["PersonId", "ForumId"],
"entityColumns":["id", "id"],
"matchColumns":["PersonId", "ForumId"],
"dateColumn":"creationDate"
},
# "Person_Insert":[], # uses default value
"Person_knows_Person_Insert":{
"entity":["Person", "Person"],
"eventColumns": ["Person1id", "Person2id"],
"entityColumns":["id", "id"],
"matchColumns": ["Person1id", "Person2id"],
"dateColumn":"creationDate"
},
"Person_likes_Comment_Insert":{
"entity":["Person", "Comment"],
"eventColumns": ["PersonId", "CommentId"],
"entityColumns":["id", "id"],
"matchColumns":["PersonId", "CommentId"],
"dateColumn":"creationDate"
},
"Person_likes_Post_Insert":{
"entity":["Person", "Post"],
"eventColumns": ["PersonId", "PostId"],
"entityColumns":["id", "id"],
"matchColumns":["PersonId", "PostId"],
"dateColumn":"creationDate"
},
"Post_Insert":{
"entity":["Person", "Post"],
"eventColumns": ["CreatorPersonId", "ContainerForumId"],
"entityColumns":["id", "id"],
"matchColumns":["CreatorPersonId", "ContainerForumId"],
"dateColumn":"creationDate"
},
# Deletes
"Comment_Delete":{
"entity":["Comment",],
"eventColumns": ["id"],
"entityColumns":["id"],
"matchColumns": ["id"],
"dateColumn":"creationDate"
},
"Forum_Delete":{
"entity":["Forum",],
"eventColumns": ["id"],
"entityColumns":["id"],
"matchColumns": ["id"],
"dateColumn":"creationDate"
},
"Forum_hasMember_Person_Delete":{
"entity":["Forum","Person"],
"eventColumns": ["ForumId", "PersonId"],
"entityColumns":["id", "id"],
"matchColumns": ["ForumId", "PersonId"],
"dateColumn":"creationDate"
},
"Person_Delete":{
"entity":["Person",],
"eventColumns": ["id"],
"entityColumns":["id"],
"matchColumns": ["id"],
"dateColumn":"creationDate"
},
"Person_knows_Person_Delete":{
"entity":["Person", "Person"],
"eventColumns": ["Person1id", "Person2id"],
"entityColumns":["id", "id"],
"matchColumns":["Person1id", "Person2id"],
"dateColumn":"creationDate"
},
"Person_likes_Comment_Delete":{
"entity":["Person", "Comment"],
"eventColumns": ["PersonId", "CommentId"],
"entityColumns":["id", "id"],
"matchColumns": ["PersonId", "CommentId"],
"dateColumn":"creationDate"
},
"Person_likes_Post_Delete":{
"entity":["Person", "Post"],
"eventColumns": ["PersonId", "PostId"],
"entityColumns":["id", "id"],
"matchColumns": ["PersonId", "PostId"],
"dateColumn":"creationDate"
},
"Post_Delete":{
"entity":["Post"],
"eventColumns": ["id"],
"entityColumns":["id"],
"matchColumns": ["id"],
"dateColumn":"creationDate"
}
}
29 changes: 29 additions & 0 deletions scripts/convert_and_append.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
Script to convert the update stream csvs to parquet and add the dependency time
"""

from add_dependent_time_interactive import DependentTimeAppender
from convert_spark_dataset_to_interactive import convert_deletes, convert_inserts
import argparse
import os

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_dir',
help="input_dir: directory containing the data e.g. '/data/out-sf1'",
type=str,
required=True
)
args = parser.parse_args()

root_data_path = args.input_dir
dta_data_path = os.path.join(root_data_path, 'graphs/parquet/raw/composite-merged-fk')

convert_inserts(root_data_path, dta_data_path)
convert_deletes(root_data_path, dta_data_path)

DTA = DependentTimeAppender(dta_data_path)
DTA.create_views()
DTA.create_and_load_temp_tables()

Loading