In [1]:
import os
import sys
import json
import codecs

import time
from datetime import datetime

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext, SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark import SparkFiles
from pyspark.sql.functions import col, lit, length, row_number, when
from pyspark.sql.functions import upper, lower

In [3]:
from lib import spark_utils

In [4]:
spark = spark_utils.get_spark()

25/03/24 16:59:54 WARN Utils: Your hostname, Mac-MD2XX1D4WV.local resolves to a loopback address: 127.0.0.1; using 192.168.11.215 instead (on interface en0)
25/03/24 16:59:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/24 16:59:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark

In [6]:
import os
import glob
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lit, length, row_number, when
from pyspark.sql.functions import avg, count, sum
from pyspark.sql.functions import upper, lower
from pyspark.sql.functions import substring, split
from pyspark.sql import functions
from pyspark.sql.types import IntegerType, StringType, LongType
from pyspark.sql.types import StructType, StructField
from pyspark.sql.functions import udf

In [7]:
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 1000000)
pd.set_option('display.width', 4000)

In [11]:
def cast_to_int_with_default(value):
    try:
        if value is None:
            return 0  # or any other default value you prefer
        else:
            return int(value)
    except ValueError:
        return 0


udf_cast_to_int_with_default = udf(cast_to_int_with_default, IntegerType())


In [12]:
def get_joined_data_with_qt(spark, input_path, _year, _qurt):
    coverpage_ = spark.read.option(
        'delimiter', '\t').option(
        'header', True).csv(
        input_path % (_year, _qurt, 'COVERPAGE.tsv'))

    coverpage_ = coverpage_.withColumns({'YEAR': lit(_year), 'QUARTER': lit(_qurt)})

    infotable_ = spark.read.option(
        'delimiter', '\t').option(
        'header', True).csv(
        input_path % (_year, _qurt, 'INFOTABLE.tsv'))

    # cusup 合法性检查
    infotable_ = infotable_.withColumn('CUSIP', lower(col('CUSIP')))
    infotable_ = infotable_.filter((infotable_['SSHPRNAMTTYPE'] == 'SH') &
                                   (infotable_['PUTCALL'].isNull()) &
                                   (infotable_['CUSIP'] != '000000000') &
                                   (infotable_['CUSIP'] != '0000000na'))
    infotable_ = infotable_.filter(length(infotable_['CUSIP']) == 9)

    # 仅选取普通股
    infotable_ = infotable_.filter(upper(col("TITLEOFCLASS")).contains("COM"))
    infotable_ = infotable_.withColumns({
        'YEAR': lit(_year), 'QUARTER': lit(_qurt)})

    summarypage_ = spark.read.option('delimiter', '\t').option(
        'header', True).csv(input_path % (_year, _qurt, 'SUMMARYPAGE.tsv'))

    summarypage_ = summarypage_.withColumns({
        'YEAR': lit(_year),
        'QUARTER': lit(_qurt)})

    submission_ = spark.read.option(
        'delimiter', '\t').option(
        'header', True).csv(input_path % (_year, _qurt, 'SUBMISSION.tsv'))
    submission_ = submission_.withColumns({'YEAR': lit(_year), 'QUARTER': lit(_qurt)})

    joined_ = infotable_.join(
        coverpage_, (infotable_['ACCESSION_NUMBER'] == coverpage_['ACCESSION_NUMBER']) &
        (infotable_['YEAR'] == coverpage_['YEAR']) & (infotable_['QUARTER'] == coverpage_['QUARTER']),
        'left'
    ).join(
        summarypage_, (infotable_['ACCESSION_NUMBER'] == summarypage_['ACCESSION_NUMBER']) &
        (infotable_['YEAR'] == summarypage_['YEAR']) & (infotable_['QUARTER'] == summarypage_['QUARTER']),
        'left'
    ).join(
        submission_, (infotable_['ACCESSION_NUMBER'] == submission_['ACCESSION_NUMBER']) &
        (infotable_['YEAR'] == submission_['YEAR']) & (infotable_['QUARTER'] == submission_['QUARTER']),
        'left'
    ).drop(
        coverpage_['YEAR'], coverpage_['QUARTER'], coverpage_['ACCESSION_NUMBER'],
        summarypage_['YEAR'], summarypage_['QUARTER'], summarypage_['ACCESSION_NUMBER'],
        submission_['YEAR'], submission_['QUARTER'], submission_['ACCESSION_NUMBER'])

    joined_ = joined_.withColumns({
        'NAMEOFISSUER': upper(col('NAMEOFISSUER')),
        'FILINGMANAGER_NAME': upper(col('FILINGMANAGER_NAME')),
        'VALUE': udf_cast_to_int_with_default(col('VALUE')),
    })

    # 需要有一个去重判断,投资机构可能多次上传,13HR,13HR-A,...
    window_spec = Window.partitionBy(
        'CUSIP', 'FILINGMANAGER_NAME'
    ).orderBy(col('FILING_DATE').desc(), col('VALUE').desc())

    joined_ = joined_.withColumn('row_number', row_number().over(window_spec))
    joined_ = joined_.filter(col('row_number') == 1).drop('row_number')

    return joined_



In [13]:
def transfer_standard_unit(filter_data, year):
    filter_data = filter_data.withColumns({
        'VALUE': udf_cast_to_int_with_default(col('VALUE')),
        'SSHPRNAMT': udf_cast_to_int_with_default(col('SSHPRNAMT')),
    })
    # 2022年及以前的VALUE单位为千$，2023年及之后为$(实际从1月3号开始)
    filter_data = filter_data.withColumn(
        'VALUE', when(lit(year) >= 2023, col('VALUE')).otherwise(col('VALUE')*1000))
    return filter_data



In [14]:
def filter_data_by_share_value(filter_data, year):
    # 对空值或负值填0
    filter_data = filter_data.withColumns({
        "VALUE": when(col("VALUE").isNull() | (col("VALUE") < 0), 0).otherwise(col("VALUE")),
        "SSHPRNAMT": when(col("SSHPRNAMT").isNull() | (col("SSHPRNAMT") < 0), 0).otherwise(col("SSHPRNAMT")),
    })
    # 以股票为Key统计总被交易价值、效果量
    ticker_value = filter_data.groupby(['CUSIP']).agg(
        sum(col('VALUE')).alias('VALUE'),
        sum(col('SSHPRNAMT')).alias('SSHPRNAMT')
    ).withColumn(
        'VPSSH', col('VALUE') / col('SSHPRNAMT')
    ).filter(
        (col('VPSSH') > 0.00000000000001) & (col('VPSSH') < 100000000000.)
    )

    ticker_value = ticker_value.withColumnRenamed(
        'CUSIP', 'CUSIP_ticker'
    ).withColumnRenamed(
        'VALUE', 'VALUE_ticker'
    ).withColumnRenamed(
        'SSHPRNAMT', 'SSHPRNAMT_ticker'
    )

    # 如果基金持有成本明显偏离总平均成本，认为是脏数据，过滤掉
    filter_data = filter_data.join(
        ticker_value, filter_data['CUSIP'] == ticker_value['CUSIP_ticker'], 'left'
    ).drop('CUSIP_ticker', 'VALUE_ticker', 'SSHPRNAMT_ticker')

    filter_data = filter_data.filter(
        (col('VALUE') / col('SSHPRNAMT') > 0.1 * col('VPSSH'))
        & (col('VALUE') / col('SSHPRNAMT') < 10 * col('VPSSH')))

    filter_data = filter_data.drop('VPSSH')

    return filter_data


In [15]:
def get_manager_level(args, config, spark, logger):
    # 获取基金的历史体量数据，依赖上一年的数据产出,所有跑数据要从小年份开始跑
    input_data_dirs = []
    for stat_year in args.manager_level_year.split(','):
        input_data_dirs += glob.glob('{base_dir}/{year}q*_form13f/manager/'.format(
            base_dir=args.output_base_dir, year=int(stat_year)))

    logger.info('get_manager_level from %s' % str(input_data_dirs))
    schema = StructType([
        StructField("FILINGMANAGER_NAME", StringType(), nullable=True),
        StructField("MANAGER_VALUE", IntegerType(), nullable=True),
        StructField("MANAGER_PREVALUE", IntegerType(), nullable=True),
        StructField("MANAGER_LEVEL", IntegerType(), nullable=True),
        StructField("YEAR", IntegerType(), nullable=True),
        StructField("QUARTER", IntegerType(), nullable=True)
    ])
    #
    data = spark.read.schema(schema).option('delimiter', '\t').option('header', True).csv(input_data_dirs)
    #
    data = data.groupby(['FILINGMANAGER_NAME']).agg(avg(col('MANAGER_VALUE')).alias('MANAGER_VALUE'))

    quantiles = data.approxQuantile("MANAGER_VALUE", config.percentile_tiers, 0.01)
    logger.info('quantiles: %s' % str(quantiles))

    data = data.withColumn("MANAGER_LEVEL",
                           when(col("MANAGER_VALUE") > quantiles[0], 0)
                           .when(col("MANAGER_VALUE") > quantiles[1], 1)
                           .when(col("MANAGER_VALUE") > quantiles[2], 2)
                           .when(col("MANAGER_VALUE") > quantiles[3], 3)
                           .when(col("MANAGER_VALUE") >= quantiles[4], 4)
                           .otherwise(5))   # 5: 采用固定数据后有些ManagerLevel找不到，归入5

    return data

In [17]:
import logging
manager_history = get_manager_level(spark, logging)  # 这份数据中有(基金-基金持仓总体量)

TypeError: get_manager_level() missing 2 required positional arguments: 'spark' and 'logger'

In [48]:
manager_history.limit(3).toPandas()

Unnamed: 0,FILINGMANAGER_NAME,MANAGER_VALUE,MANAGER_LEVEL
0,"ARDENT CAPITAL MANAGEMENT, INC.",71332040.0,3
1,ASPIRE WEALTH MANAGEMENT CORP,10031790.0,4
2,CANYON CAPITAL ADVISORS LLC,1120701000.0,1


In [15]:
input_path = '/Users/liuda/Library/CloudStorage/Dropbox/shareit/code/trading/data/hedge/%sq%s_form13f/%s'
cur_year='2022'
cur_qurt='2'
pre_year='2022'
pre_qurt='1'

In [69]:
joined_cur = get_joined_data_with_qt(spark, input_path, cur_year, cur_qurt)

In [70]:
joined_cur = transfer_standard_unit(joined_cur, cur_year)

In [71]:
joined_cur = filter_data_by_share_value(joined_cur, cur_year)

In [73]:
joined_cur = joined_cur.join(
    manager_history,
    (joined_cur['FILINGMANAGER_NAME'] == manager_history['FILINGMANAGER_NAME']),
    'left'
).drop(manager_history['FILINGMANAGER_NAME'])

In [74]:
joined_cur = joined_cur.filter((col('CUSIP') == '874039100') & (col('MANAGER_LEVEL') == 2))

In [75]:
joined_cur.groupby(['YEAR', 'QUARTER', 'MANAGER_LEVEL']).agg({'VALUE': 'sum'}).toPandas()

                                                                                

Unnamed: 0,YEAR,QUARTER,MANAGER_LEVEL,sum(VALUE)
0,2022,2,2,128295000


In [57]:
joined_pre = get_joined_data_with_qt(spark, input_path, pre_year, pre_qurt)   # 过滤CUSIP

# 统一VALUE单位
joined_pre = transfer_standard_unit(joined_pre, pre_year)

# 统计股票的均价，如果基金成本明显异于均价，认为是异常值,过滤
joined_pre = filter_data_by_share_value(joined_pre, pre_year)

joined_pre = joined_pre.select(['NAMEOFISSUER', 'FILINGMANAGER_NAME', 'CUSIP',
                                'YEAR', 'QUARTER', 'VALUE',
                                'SSHPRNAMTTYPE', 'SSHPRNAMT',
                                'FILING_DATE'])


joined_pre = joined_pre.join(
    manager_history,
    (joined_pre['FILINGMANAGER_NAME'] == manager_history['FILINGMANAGER_NAME']),
    'left'
).drop(manager_history['FILINGMANAGER_NAME'])

joined_pre = joined_pre.withColumnRenamed(
    "NAMEOFISSUER", "PRENAMEOFISSUER").withColumnRenamed(
    "FILINGMANAGER_NAME", "PREFILINGMANAGER_NAME").withColumnRenamed(
    "CUSIP", "PRECUSIP").withColumnRenamed(
    "VALUE", "PREVALUE").withColumnRenamed(
    'YEAR', 'PREYEAR').withColumnRenamed(
    'QUARTER', 'PREQUARTER').withColumnRenamed(
    'SSHPRNAMTTYPE', 'PRESSHPRNAMTTYPE').withColumnRenamed(
    'SSHPRNAMT', 'PRESSHPRNAMT').withColumnRenamed(
    'FILING_DATE', 'PREFILING_DATE').withColumnRenamed(
    'MANAGER_LEVEL', 'PREMANAGER_LEVEL')

In [76]:
joined_pre = joined_pre.filter((col('CUSIP') == '874039100') & (col('MANAGER_LEVEL') == 2))

In [77]:
joined_pre.groupby(['PREYEAR', 'PREQUARTER', 'PREMANAGER_LEVEL']).agg({'PREVALUE': 'sum'}).toPandas()

                                                                                

Unnamed: 0,PREYEAR,PREQUARTER,PREMANAGER_LEVEL,sum(PREVALUE)
0,2022,1,2,148828000


In [78]:
joined_data = joined_cur.join(
    joined_pre, (joined_cur['FILINGMANAGER_NAME'] == joined_pre['PREFILINGMANAGER_NAME']) &
    (joined_cur['CUSIP'] == joined_pre['PRECUSIP']),
    'outer'  # 这里因为要求PREVALUE，所以应该为outer
)

In [80]:
joined_data.groupby(['YEAR', 'QUARTER', 'MANAGER_LEVEL', 'PREMANAGER_LEVEL', 'PREYEAR', 'PREQUARTER']).agg(
    sum('VALUE').alias('VALUE'), 
    sum('PREVALUE').alias('PREVALUE')
).toPandas()

                                                                                

Unnamed: 0,YEAR,QUARTER,MANAGER_LEVEL,PREMANAGER_LEVEL,PREYEAR,PREQUARTER,VALUE,PREVALUE
0,,,,2.0,2022.0,1.0,,6134000.0
1,2022.0,2.0,2.0,,,,1696000.0,
2,2022.0,2.0,2.0,2.0,2022.0,1.0,126599000.0,142694000.0
