In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

## EXPLORATION of df1

In [5]:
df1 = spark.read.csv('s3://sec-finc/t1/',header = False)

In [6]:
df1.show()

+--------------------+
|                 _c0|
+--------------------+
|147793216011396	0...|
|147793216011552	0...|
|147793216012172	0...|
|147793216012878	0...|
|147793216013334	0...|
|147793216013650	0...|
|147793216013919	0...|
|147793217000195	0...|
|147793217000472	0...|
|147793217000749	0...|
|147793217000776	0...|
|147793217001513	0...|
|147793217001669	0...|
|147793217001708	0...|
|147793217001790	0...|
|147793217002410	0...|
|147793217002414	0...|
|147793217002566	0...|
|147793217002839	0...|
|147793217003607	0...|
+--------------------+
only showing top 20 rows



In [12]:
from pyspark.sql.functions import split, explode,col
col_names = ['accession_number_int','accession_number','cik','company_name',
             'filing_date','document_type','document_period_end_date','current_fiscal_year_end_date',
             'document_fiscal_year_focus','document_fiscal_period_focus','current_fiscal_year_end_month',
             'amendment_flag','assigned_sic','irs_number','state_of_incorporation','mailing_address_street1',
             'mailing_address_street2','mailing_address_city','mailing_address_state','mailing_address_zip',
             'business_address_street1','business_address_street2','business_address_city','business_address_state',
             'business_address_zip','mailing_phone_number','business_phone_number']
for i in range(0,len(col_names)):
    df1 = df1.withColumn(col_names[i], split(col("_c0"), "\t")[i]).cache()

In [14]:
df1 = df1.drop('_c0')
df1.show()

+--------------------+--------------------+-------+--------------------+-----------+-------------+------------------------+----------------------------+--------------------------+----------------------------+-----------------------------+--------------+------------+----------+----------------------+-----------------------+-----------------------+--------------------+---------------------+-------------------+------------------------+------------------------+---------------------+----------------------+--------------------+--------------------+---------------------+
|accession_number_int|    accession_number|    cik|        company_name|filing_date|document_type|document_period_end_date|current_fiscal_year_end_date|document_fiscal_year_focus|document_fiscal_period_focus|current_fiscal_year_end_month|amendment_flag|assigned_sic|irs_number|state_of_incorporation|mailing_address_street1|mailing_address_street2|mailing_address_city|mailing_address_state|mailing_address_zip|business_address_s

In [15]:
df1.take(5)

[Row(accession_number_int='147793216011396', accession_number='0001477932-16-011396', cik='1528697', company_name='BOOKEDBYUS INC.', filing_date='2016-07-15', document_type='10-Q', document_period_end_date='2016-05-31', current_fiscal_year_end_date='--08-31', document_fiscal_year_focus='2016', document_fiscal_period_focus='Q3', current_fiscal_year_end_month='8', amendment_flag='false', assigned_sic='5045', irs_number='261679929 ', state_of_incorporation='NV', mailing_address_street1='619 S. RIDGELEY DRIVE', mailing_address_street2='\\N', mailing_address_city='LOS ANGELES', mailing_address_state='CA', mailing_address_zip='90036', business_address_street1='619 S. RIDGELEY DRIVE', business_address_street2='\\N', business_address_city='LOS ANGELES', business_address_state='CA', business_address_zip='90036', mailing_phone_number='\\N', business_phone_number='(323) 634-10'),
 Row(accession_number_int='147793216011552', accession_number='0001477932-16-011552', cik='1532926', company_name='BRK

In [16]:
df1.createOrReplaceTempView('company_submission')

In [17]:
## number of companies:
spark.sql('''
    SELECT count(distinct a.company_name)
    FROM company_submission as a
''').show()

+----------------------------+
|count(DISTINCT company_name)|
+----------------------------+
|                       15807|
+----------------------------+



In [21]:
## type of doc
spark.sql('''
    SELECT a.document_type,count(distinct a.company_name) as cnt
    FROM company_submission as a
    group by a.document_type
    order by cnt DESC
''').show()

+-------------+----+
|document_type| cnt|
+-------------+----+
|         10-Q|9318|
|         10-K|8423|
|         null|5330|
|       10-Q/A|3103|
|       10-K/A|1558|
|         20-F| 847|
|       20-F/A| 338|
|         40-F| 149|
|        10-KT| 124|
|       40-F/A|  33|
|        10-QT|  27|
|      10-KT/A|  11|
|        424B3|   6|
|     10-12G/A|   3|
|      10-QT/A|   3|
|      DEF 14A|   1|
|         10-D|   1|
|       10-12G|   1|
+-------------+----+



In [24]:
## 10-K yearly submission
spark.sql('''
    SELECT a.document_fiscal_year_focus,count(distinct a.company_name) as cnt
    FROM company_submission as a
    WHERE document_type == '10-K'
    group by a.document_fiscal_year_focus
    order by a.document_fiscal_year_focus DESC
''').show()


+--------------------------+----+
|document_fiscal_year_focus| cnt|
+--------------------------+----+
|                        \N| 389|
|                     43465|   1|
|                      2107|   1|
|                      2020|   1|
|                      2019| 394|
|                      2018|3736|
|                      2017|3955|
|                      2016|4135|
|                      2015|4410|
|                      2014|4707|
|                      2013|4923|
|                      2012|5082|
|                      2011|4630|
|                      2010|1135|
|                      2009|   3|
|                      2001|   1|
+--------------------------+----+



In [None]:
***data cleaning***

## EXPLORATION of df3

In [29]:
df3 = spark.read.text('s3://sec-finc/t3/')

In [30]:
df3.show()

+--------------------+
|               value|
+--------------------+
|702325	7023251700...|
|702325	7023251700...|
|39368	14377491701...|
|702325	7023251700...|
|39368	14377491701...|
|1008586	162828016...|
|1008586	162828016...|
|1326089	155837018...|
|1326089	155837018...|
|1326089	155837018...|
|1681903	168190318...|
|1272830	127283017...|
|1272830	127283017...|
|1681903	168190318...|
|1681903	168190318...|
|1681903	168190318...|
|1326089	155837018...|
|1681903	168190318...|
|1326089	155837018...|
|1503636	144530513...|
+--------------------+
only showing top 20 rows



In [31]:
col_names2 = ['cik','accession_number_int','filing_date','datapoint_id','datapoint_name','version',
              'segment_label','segment_hash','start_date','end_date','period_month','string_value','numeric_value',
              'decimals','unit','footnotes']
for i in range(0,len(col_names2)):
    df3 = df3.withColumn(col_names2[i], split(col("value"), "\t")[i]).cache()

In [32]:
df3 = df3.drop('value')

In [None]:
df3.show()

In [None]:
df3.take(5)

In [None]:
df3.createOrReplaceTempView('data_point')

## Part 1. Datasets Exploration

```
requirment
Explore, assess and visualize the data. 
Aggregate, count, and summarize. 
```

In [None]:
## Read in
## parse or reformat


In [None]:
#one by one explore, 
#e.g. for company_submission, print the year range, how many company submitted, the table of doc_type & frequency

#select subsets and join them together
#e.g. table#3&4, data points and report_presentation_line_item


In [None]:
## redundancy regarding to the data points
## count nulls


In [None]:
## Provide potential next steps, 
## e.g. what relationship can be illustrated in the plot, for example the #of companies against yr