In [59]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col, lit, concat_ws
from pathlib import Path
from vnstock import *
from pyspark.sql.types import StructType
import re

In [60]:
import pkg_resources
pkg_resources.require("pandas==1.4.0")
import pandas as pd

In [61]:
# Spark session
spark = SparkSession \
        .builder \
        .master("local").appName("PySpark_Postgres_test").getOrCreate()

# Get ticker list
tickers = Path("/home/jazzdung/Projects/financial-data-platform/Airflow/data/airflow/ticker/ticker.txt").read_text()[1:-1].split("', '")

# Database info
USERNAME = 'postgres'
PASSWORD = '02092001'
URL = "jdbc:postgresql://localhost:5432/financialDataPlatform"

In [70]:
#Read a table
df = spark.read \
        .format("jdbc") \
        .option("url", URL) \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", "income_statement") \
        .option("user", USERNAME) \
        .option("password", PASSWORD) \
        .load()

df.count()

0

In [63]:
#Write to table
def data_to_db(df, table):
    (df.write
        .option("truncate", "true")
        .format('jdbc')
        .options(
            url=URL,
            driver='org.postgresql.Driver',
            dbtable=table,
            user=USERNAME,
            password=PASSWORD)
        .mode('overwrite')
        .save())

In [64]:
def get_ratio(report_type, table):

    '''
    Use for cashflow, incomestatement, balancesheet, financialratio
    '''

    #Create blank dataframe
    df = spark.createDataFrame([], StructType([]))

    #Append each ticker's value
    for ticker in tickers:
        try:
            for i in range(2):     
                       
                if report_type in ["cashflow", "incomestatement", "balancesheet"]:
                    data = requests.get('https://apipubaws.tcbs.com.vn/tcanalysis/v1/finance/{}/{}'.format(ticker, report_type), params={'yearly': i, 'isAll':'true'})
                else:
                    data = requests.get('https://apipubaws.tcbs.com.vn/tcanalysis/v1/finance/{}/financialratio?yearly={}&isAll={}'.format(ticker, i, True))
                
                rdd = spark.sparkContext.parallelize([data.text])
                value = spark.read.json(rdd)
                df = df.unionByName(value, allowMissingColumns=True)     
        except Exception: 
            pass  
    
    #Clean
    df = df.dropDuplicates(["ticker","year","quarter"])
    df = df.orderBy(col("ticker").asc(),col("year").desc(), col("quarter").desc())

    #Write to db
    data_to_db(df, table)
    return df

# test = get_ratio('incomestatement', 'incomestatement').printSchema()

In [65]:
def get_rating(rating_type, table):

    '''
    Use for general, business-model, business-operation, financial-health, valuation, financial-health
    '''

    #Create blank dataframe
    df = spark.createDataFrame([], StructType([]))

    #Append each ticker's value
    for ticker in tickers:
        try:
            url = 'https://apipubaws.tcbs.com.vn/tcanalysis/v1/rating/{}/' + rating_type + '?fType=TICKER'
            data = requests.get(url.format(ticker))
            rdd = spark.sparkContext.parallelize([data.text])
            value = spark.read.json(rdd)

            if rating_type == 'general':
                value = value.drop(col('stockRecommend'))
                
            df = df.unionByName(value, allowMissingColumns=True)
        except Exception: 
            pass  
    
    #Clean
    df = df.dropDuplicates(["ticker"])
    df = df.orderBy(col("ticker").asc())

    #Write to db
    data_to_db(df, table)
    return df

# test = get_rating('general', 'general_rating')

In [66]:
def clean_time(df, column):
    time_stamp = regexp_replace(col('tradingDate'), 'T', ' ')
    time_stamp = regexp_replace(time_stamp, '\.(.+)', '')
    return df.withColumn(column, time_stamp)

In [67]:
def get_stock_history(start_date, end_date):
    #Convert date to timestamp
    fd = int(time.mktime(time.strptime(start_date, "%Y-%m-%d")))
    td = int(time.mktime(time.strptime(end_date, "%Y-%m-%d")))

    #Create blank dataframe
    df = spark.createDataFrame([], StructType([]))

    for ticker in tickers:
        try:
            data = requests.get('https://apipubaws.tcbs.com.vn/stock-insight/v1/stock/bars-long-term?ticker={}&type=stock&resolution=D&from={}&to={}'.format(ticker, fd, td)).json()
            rdd = spark.sparkContext.parallelize(data['data'])
            value = spark.read.json(rdd)
            value = value.withColumn('ticker', lit(ticker))
            df = df.unionByName(value, allowMissingColumns=True)
        except Exception: 
            pass 
    
    #clean
    df = clean_time(df, 'tradingDate')
    df = df.withColumnRenamed('tradingDate', 'timeStamp')

    #Write to db
    data_to_db(df, 'stock_history')
    return df

# get_stock_history("2022-10-01", "2022-11-22").show(truncate = False)

In [68]:
def get_intraday_transaction():   
    #Create blank dataframe
    df = spark.createDataFrame([], StructType([]))

    d = datetime.now()
    today = re.sub(" (.+)","", str(d))

    if d.weekday() > 4: #today is weekend
        for ticker in tickers:
            try:
                data = requests.get('https://apipubaws.tcbs.com.vn/stock-insight/v1/intraday/{}/his/paging?page={}&size={}&headIndex=-1'.format(ticker, 0, 1000)).json()
                pd_df = json_normalize(data['data'])
                value=spark.createDataFrame(pd_df)                 
                value = value.withColumn('ticker', lit(ticker))
                df = df.unionByName(value, allowMissingColumns=True)
            except Exception: 
                pass 
    else: #today is weekday
        for ticker in tickers:
            try:
                data = requests.get('https://apipubaws.tcbs.com.vn/stock-insight/v1/intraday/{}/his/paging?page={}&size={}'.format(ticker, 0, 1000)).json()
                pd_df = json_normalize(data['data'])
                value=spark.createDataFrame(pd_df)     
                value = value.withColumn('ticker', lit(ticker))
                df = df.unionByName(value, allowMissingColumns=True)
            except Exception: 
                pass 
    
    #Clean
    df = df.withColumn('t', concat_ws(' ', lit(today), col('t')))
    df = df.withColumnRenamed("p","price")\
            .withColumnRenamed("v","volume")\
            .withColumnRenamed("t","timeStamp")

    #Write to db
    data_to_db(df, 'listing_companies')
    return df
    
# get_intraday_transaction().show(100000)

In [69]:
def get_listing_companies():

    #Create blank dataframe
    df = spark.createDataFrame([], StructType([]))

    for ticker in tickers:
        try:
            data = requests.get('https://apipubaws.tcbs.com.vn/tcanalysis/v1/ticker/{}/overview'.format(ticker))
            rdd = spark.sparkContext.parallelize([data.text])
            value = spark.read.json(rdd)
            df = df.unionByName(value, allowMissingColumns=True)
        except Exception: 
            pass  
    
    #Write to db
    data_to_db(df, 'stock_history')
    return df

# get_listing_companies().show(1000, truncate=False)

In [None]:
with open("foo","r") as f:
    string = f.read()
print( '. '.join(map(lambda s: s.strip().capitalize(), x.split('_'))))