Skip to content

Commit

Permalink
Merge pull request #1 from stewartbryson/stewart
Browse files Browse the repository at this point in the history
Temp table for finwire.
  • Loading branch information
stewartbryson committed Jul 31, 2023
2 parents 63fbfdf + 3c5d311 commit a65ec05
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 19 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/release-drafter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
steps:
# Drafts your next Release notes as Pull Requests are merged into "master"
- uses: release-drafter/release-drafter@v5
# with:
# disable-autolabeler: true
with:
commitish: main
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
30 changes: 13 additions & 17 deletions tpcdi.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def save_df(
else:
df.write \
.mode("overwrite") \
.save_as_table(table_name)
.save_as_table(table_name) \

print(f"{table_name.upper()} table created.")

Expand Down Expand Up @@ -416,13 +416,20 @@ def get_phone_number(
stage_path = get_stage_path(stage, con_file_name)
upload_files(con_file_name, stage_path)

# CMP record types
# generic dataframe for all record types
# create a temporary table
df = session \
.read \
.schema(schema) \
.option('field_delimiter', '|') \
.csv(stage_path) \
.with_column('rec_type', substring(col("line"), lit(16), lit(3))) \
.with_column('pts', to_timestamp(substring(col("line"), lit(0), lit(15)), lit("yyyymmdd-hhmiss"))) \
.write.mode("overwrite").save_as_table("finwire", table_type="temporary")

# CMP record types
df = session \
.table('finwire') \
.where(col('rec_type') == 'CMP') \
.with_column('company_name', substr(col('line'), lit(19), lit(60))) \
.withColumn('cik', substring(col("line"), lit(79), lit(10))) \
Expand All @@ -438,18 +445,13 @@ def get_phone_number(
.withColumn('country', substring(col("line"), lit(324), lit(24))) \
.withColumn('ceo_name', substring(col("line"), lit(348), lit(46))) \
.withColumn('description', substring(col("line"), lit(394), lit(150))) \
.with_column('pts', to_timestamp(substring(col("line"), lit(0), lit(15)), lit("yyyymmdd-hhmiss"))) \
.drop(col('line')) \

save_df(df, 'cmp')

# SEC record types
df = session \
.read \
.schema(schema) \
.option('field_delimiter', '|') \
.csv(stage_path) \
.with_column('rec_type', substring(col("line"), lit(16), lit(3))) \
.table('finwire') \
.where(col('rec_type') == 'SEC') \
.withColumn('symbol', substring(col("line"), lit(19), lit(15))) \
.withColumn('issue_type', substring(col("line"), lit(34), lit(6))) \
Expand All @@ -461,18 +463,13 @@ def get_phone_number(
.withColumn('first_exchange_date', substring(col("line"), lit(141), lit(8))) \
.withColumn('dividend', substring(col("line"), lit(149), lit(12))) \
.withColumn('co_name_or_cik', substring(col("line"), lit(161), lit(60))) \
.with_column('pts', to_timestamp(substring(col("line"), lit(0), lit(15)), lit("yyyymmdd-hhmiss"))) \
.drop(col('line'))
.drop(col('line')) \

save_df(df, 'sec')

# FIN record types
df = session \
.read \
.schema(schema) \
.option('field_delimiter', '|') \
.csv(stage_path) \
.with_column('rec_type', substring(col("line"), lit(16), lit(3))) \
.table('finwire') \
.where(col('rec_type') == 'FIN') \
.withColumn('year', substring(col("line"), lit(19), lit(4))) \
.withColumn('quarter', substring(col("line"), lit(23), lit(1))) \
Expand All @@ -489,8 +486,7 @@ def get_phone_number(
.withColumn('sh_out', substring(col("line"), lit(161), lit(13))) \
.withColumn('diluted_sh_out', substring(col("line"), lit(174), lit(13))) \
.withColumn('co_name_or_cik', substring(col("line"), lit(187), lit(60))) \
.with_column('pts', to_timestamp(substring(col("line"), lit(0), lit(15)), lit("yyyymmdd-hhmiss"))) \
.drop(col("line"))
.drop(col("line")) \

save_df(df, 'fin')

Expand Down

0 comments on commit a65ec05

Please sign in to comment.