In [1]:
from bs4 import BeautifulSoup
import urllib.request
import requests
from pprint import pprint
import pandas as pd
import numpy as np
import json
import copy
from datetime import datetime, timezone
from google.cloud import storage
import gcsfs
import re
from sqlalchemy import create_engine, text
from typing import List

In [2]:
postgres_config = {
    "host": "findy-medium-stage.czmgcqkw4ett.ap-southeast-1.rds.amazonaws.com",
    "database": "findy_medium_stage",
    "user": "postgres",
    "password": "F!nDy!Med!umStage2o24",
    "port": "5432"
}

In [3]:
def build_upsert_query(cols: List[str],
                       table_name: str,
                       unique_key: List[str]=[],
                       cols_not_for_update: List[str] = None) -> str:
    """
    Builds postgres upsert query using input arguments.
    Note: In the absence of unique_key, this will be just an insert query.
    Example : build_upsert_query(
        ['col1', 'col2', 'col3', 'col4'],
        "my_table",
        ['col1'],
        ['col2']
    ) ->
    INSERT INTO my_table (col1, col2, col3, col4) VALUES %s
    ON CONFLICT (col1) DO UPDATE SET (col3, col4) = (EXCLUDED.col3, EXCLUDED.col4) ;
    :param cols: the postgres table columns required in the
        insert part of the query.
    :param table_name: the postgres table name.
    :param unique_key: unique_key of the postgres table for checking
        unique constraint violations.
    :param cols_not_for_update: columns in cols which are not required in
        the update part of upsert query.
    :return: Upsert query as per input arguments.
    """
    cols = [f'"{col}"' for col in cols]
    cols_str = ', '.join(cols)
    insert_query = """ INSERT INTO %s (%s) VALUES %%s """ % (
        table_name, cols_str
    )
    if cols_not_for_update is not None:
        cols_not_for_update.extend(unique_key)
    else:
        cols_not_for_update = [col for col in unique_key]
    cols_not_for_update = [f'"{col}"' for col in cols_not_for_update]
    unique_key = [f'"{col}"' for col in unique_key]
    unique_key_str = ', '.join(unique_key)

    update_cols = [f"{col}" for col in cols if col not in cols_not_for_update]
    update_cols_str = ', '.join(update_cols)
    update_cols_with_excluded_markers = [f'EXCLUDED.{col}' for col in update_cols]
    update_cols_with_excluded_markers_str = ', '.join(
        update_cols_with_excluded_markers
    )
    if len(update_cols) > 1:
        equality_clause = "(%s) = (%s)"
    else:
        equality_clause = "%s = %s"

    on_conflict_clause = f""" ON CONFLICT (%s) DO UPDATE SET {equality_clause} ;"""
    on_conflict_clause = on_conflict_clause % (
        unique_key_str,
        update_cols_str,
        update_cols_with_excluded_markers_str
    )
    if len(unique_key) == 0:
        return insert_query
    return insert_query + on_conflict_clause

In [4]:
storage_client = storage.Client(project="ytone-430507")
today = datetime.now()
table_name = "douyin_influencer"
api_name = "get_author_base_info"
bucket_name = "3_staging_area"

In [5]:
engine = create_engine("postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}".format(**postgres_config))
l_df = pd.read_sql_table("douyin_influencer",con=engine)
l_df

Unnamed: 0,id,user_id,short_id,nickname,image_url,fans_count,city,province,level,order_cnt,...,total_favour_cnt,cooperate_index,cp_index,growth_index,shopping_index,spread_index,top_score,deleted,tags_ids,updated_hf
0,33562,7313163788587368474,,叶若兮,https://p26.douyinpic.com/aweme/1080x1080/awem...,425451.0,,,,,...,,839100.0,710300.0,,868400.0,883900.0,806200.0,False,"[60, 130, 72]",NaT
1,379599,6938466774778118182,,岩石哥@家居装企,https://p3.douyinpic.com/aweme/1080x1080/aweme...,97302.0,佛山市,广东省,,,...,,,,,,,,False,[91],NaT
2,269646,7020201429570682910,,世俗阳光,https://p3.douyinpic.com/aweme/1080x1080/aweme...,11062.0,深圳,,,,...,,,,,,,,False,[100],NaT
3,269647,6856627727447425031,,Adım Gu Lin.,https://p3.douyinpic.com/aweme/1080x1080/aweme...,22165.0,徐州市,江苏省,,,...,,,,,,,,False,[100],NaT
4,269648,6990883355331919886,,威海凯锅(门市租售),https://p3.douyinpic.com/aweme/1080x1080/aweme...,22915.0,威海市,山东省,,,...,,,,,,,,False,[100],NaT
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
161393,59503,6870169854523998221,,巧夺天工,https://p11.douyinpic.com/aweme/1080x1080/awem...,199555.0,赣州市,江西省,,,...,,760500.0,663400.0,,636500.0,411500.0,564200.0,False,[130],NaT
161394,59492,6870169532812509191,,足球叔叔,https://p11.douyinpic.com/aweme/1080x1080/awem...,176183.0,,,,,...,,760699.0,654500.0,,362600.0,328400.0,496700.0,False,[23],NaT
161395,59510,6698507560422473741,,王者阿闪,https://p3.douyinpic.com/aweme/1080x1080/aweme...,168233.0,,香港特别行政区,,,...,,814800.0,428400.0,,477600.0,374900.0,498800.0,False,[23],NaT
161396,59517,7028140674297888781,,吃不饱同学,https://p3.douyinpic.com/aweme/1080x1080/aweme...,14601.0,南昌,,,,...,,750000.0,0.0,,252000.0,270400.0,182100.0,False,[48],NaT


In [6]:
processing_blobs = [
{
    "blob": blob,
    "date": blob.name.split('/')[2],
    "batch": int(blob.name.split('/')[-1].replace(".parquet", "").split("_")[-1]),
} for blob in storage_client.list_blobs("3_staging_area",prefix="1_xingtu/douyin_influencer/") if api_name in blob.name]
processing_blobs

[{'blob': <Blob: 3_staging_area, 1_xingtu/douyin_influencer/2024-09-11/get_author_base_info_240920_0.parquet, 1726803504667584>,
  'date': '2024-09-11',
  'batch': 0},
 {'blob': <Blob: 3_staging_area, 1_xingtu/douyin_influencer/2024-09-17/get_author_base_info_240920_0.parquet, 1726803627188174>,
  'date': '2024-09-17',
  'batch': 0},
 {'blob': <Blob: 3_staging_area, 1_xingtu/douyin_influencer/2024-09-18/get_author_base_info_240920_0.parquet, 1726803629892657>,
  'date': '2024-09-18',
  'batch': 0},
 {'blob': <Blob: 3_staging_area, 1_xingtu/douyin_influencer/2024-09-19/get_author_base_info_240920_0.parquet, 1726803631833151>,
  'date': '2024-09-19',
  'batch': 0},
 {'blob': <Blob: 3_staging_area, 1_xingtu/douyin_influencer/2024-10-09/get_author_base_info_241009_0.parquet, 1728457802828707>,
  'date': '2024-10-09',
  'batch': 0},
 {'blob': <Blob: 3_staging_area, 1_xingtu/douyin_influencer/2024-10-11/get_author_base_info_241011_0.parquet, 1728634779984729>,
  'date': '2024-10-11',
  'batc

In [7]:
bucket = storage_client.get_bucket(bucket_name)
meta_blob = bucket.blob("1_xingtu/douyin_influencer/meta.json")
if meta_blob.exists():
    processed_blobs = json.loads(meta_blob.download_as_string())
else:
    processed_blobs = []
processed_blobs

[{'file_path': 'gs://3_staging_area/1_xingtu/douyin_price/handler_post_241009_0.parquet',
  'date': '2024-09-13',
  'batch': 0},
 {'file_path': 'gs://3_staging_area/1_xingtu/douyin_price/handler_post_241009_1.parquet',
  'date': '2024-09-13',
  'batch': 1},
 {'file_path': 'gs://3_staging_area/1_xingtu/douyin_price/handler_post_241009_0.parquet',
  'date': '2024-10-02',
  'batch': 0},
 {'file_path': 'gs://3_staging_area/1_xingtu/douyin_price/handler_post_241009_1.parquet',
  'date': '2024-10-02',
  'batch': 1},
 {'file_path': 'gs://3_staging_area/1_xingtu/douyin_price/handler_post_241009_2.parquet',
  'date': '2024-10-02',
  'batch': 2},
 {'file_path': 'gs://3_staging_area/1_xingtu/douyin_price/handler_post_241009_3.parquet',
  'date': '2024-10-02',
  'batch': 3},
 {'file_path': 'gs://3_staging_area/1_xingtu/douyin_price/handler_post_241009_4.parquet',
  'date': '2024-10-02',
  'batch': 4},
 {'file_path': 'gs://3_staging_area/1_xingtu/douyin_price/handler_post_241009_5.parquet',
  'date

In [8]:
to_process = []
for processing_blob in processing_blobs:
    processing_date = datetime.strptime(processing_blob["date"], "%Y-%m-%d")
    if processing_date >= datetime(2024, 9, 8):
        if processing_blob["batch"] not in [processed_blob["batch"] for processed_blob in processed_blobs if api_name in processed_blob["file_path"] and processing_blob["date"] == processed_blob["date"]]:
            to_process.append(processing_blob)
pprint(to_process)
print(len(to_process))

[{'batch': 0,
  'blob': <Blob: 3_staging_area, 1_xingtu/douyin_influencer/2024-10-11/get_author_base_info_241011_0.parquet, 1728634779984729>,
  'date': '2024-10-11'}]
1


In [9]:
def remove_rows_with_integer(value):
    value = value.replace("{", "").replace("}", "")
    value = value.split(",")
    try:
        value = [int(i) for i in value]
    except Exception:
        value = None
    return value

In [10]:
def check_all_value(value):
    result = []
    if value:
        for i in value:
            if i <= 32767:
                result.append(True)
            else:
                result.append(False)
    return result

In [11]:
for item in to_process:
    r_df = pd.read_parquet("gs://" + bucket_name + "/" + item["blob"].name)
    r_df["tags_ids_level_two"] = r_df["tags_ids_level_two"].str.replace("[","{").str.replace("]","}")
    r_df["tags_ids"] = r_df["tags_ids"].str.replace("[","{").str.replace("]","}")
    r_df["create_time"] = r_df["create_time"].astype(str)
    r_df["modify_time"] = r_df["modify_time"].astype(str)
    r_df["deleted"] = False
    df = pd.merge(l_df, r_df, on="user_id", how="right")
    df.columns = [column.replace("_y", "") for column in df.columns]
    upsert_df = df[r_df.columns]
    upsert_df = upsert_df[upsert_df['tags_ids_level_two'].apply(remove_rows_with_integer).apply(check_all_value).apply(all)]
    upsert_df.nickname = upsert_df.nickname.str.replace("'", "''")
    upsert_df = upsert_df.drop_duplicates("core_user_id")
    query = build_upsert_query(upsert_df.columns, "douyin_influencer", ["core_user_id"])
    value = ", ".join([str(record).replace("''", "NULL") for record in upsert_df.fillna("").to_records(index=False)])
    query = query % value
    with engine.connect() as conn:
        result = conn.execute(text(query))
        conn.commit()
    processed_blobs.append({
        'file_path': "gs://" + bucket_name + "/1_xingtu/douyin_influencer/" + api_name + "_" + today.strftime("%y%m%d") + "_" + str(item["batch"]) + ".parquet",
        'date': item["date"],
        'batch': item["batch"]
    })
    meta_blob.upload_from_string(json.dumps(processed_blobs))