This notebook is for the development and testing of the PySpark scripts in the ```./scripts``` directory.

In [23]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, datediff, current_date, round, dense_rank, when
import os

spark = SparkSession.builder.master('local[4]').getOrCreate()

In [None]:
# Get input file sizes in MB
total = 0
for file in os.listdir('../corporate_uk'):
    print(file, os.path.getsize(f'../corporate_uk/{file}') / (1024**2))
    total += os.path.getsize(f'../corporate_uk/{file}') / (1024**2)
print(total)

In [None]:
companies = spark.read.option('delimiter', ';').option('header', 'true').csv('../corporate_uk/companies.csv')\
    .withColumn('current_assets', col('current_assets').cast('double'))\
    .withColumn('average_number_employees_during_period', col('average_number_employees_during_period').cast('double'))\
    .withColumn('account_type', when(col('account_type') == 'Null', None).otherwise(col('account_type')))
sic_codes = spark.read.option('delimiter', ';').option('header', 'true').csv('../corporate_uk/companies_sic_codes.csv')
filings = spark.read.option('delimiter', ';').option('header', 'true').csv('../corporate_uk/filings.csv')\
    .withColumn('pages', col('pages').cast('int'))
owners = spark.read.option('delimiter', ';').option('header', 'true').csv('../corporate_uk/officers_and_owners.csv')

In [None]:
companies.groupBy('company_status')\
    .count()\
    .sort('count', ascending=False)\
    .show(truncate=False)

In [None]:
# Which account type is most common in each jurisdiction?
w = Window.partitionBy(['jurisdiction']).orderBy(col('count').desc())
companies.filter(col('jurisdiction').isNotNull() & col('account_type').isNotNull())\
        .groupBy(['jurisdiction', 'account_type'])\
        .count()\
        .withColumn('rnk', dense_rank().over(w))\
        .where(col('rnk') == 1)\
        .drop(col('rnk'))\
        .sort(col('jurisdiction'))\
        .show(truncate=False)

In [None]:
sic_codes.groupBy(['sic_section', 'sic_division'])\
    .count()\
    .sort(col('count').desc())\
    .limit(10)\
    .show(truncate=False)

In [None]:
companies.withColumn('age', datediff(current_date(), col('incorporation_date')) / 365.25)\
.select([
    'company_type', 
    'age'])\
.groupBy('company_type')\
.avg()\
.withColumn('average_age', round(col('avg(age)'), 2))\
.drop(col('avg(age)'))\
.sort(col('average_age'))\
.show(truncate=False)

In [None]:
companies.filter((companies.next_accounts_overdue == True) | (companies.confirmation_statement_overdue == True))\
    .select(col('company_name'))\
    .distinct()\
    .show()

In [None]:
companies.groupBy('account_type')\
    .count()\
    .withColumn('percentage', round(100 * col('count') / companies.count(), 3))\
    .sort('percentage', ascending=False)\
    .show(truncate=False)

In [None]:
# Which SIC codes have the highest current assets on average?
companies.join(sic_codes, 'company_number', how='left')\
    .select([
        companies.company_number, 
        sic_codes.sic_code, 
        sic_codes.sic_description,
        companies.current_assets
    ])\
    .groupBy('sic_description')\
    .avg('current_assets')\
    .sort('avg(current_assets)', ascending=False)\
    .show(truncate=False)

In [None]:
companies.groupBy('company_type')\
    .avg('average_number_employees_during_period')\
    .withColumn('avg_employees', col('avg(average_number_employees_during_period)').cast('int'))\
    .drop('avg(average_number_employees_during_period)')\
    .show(truncate=False)

In [None]:
owners.filter(owners.is_owner == True)\
    .groupby(['company_country'])\
    .count()\
    .sort('count', ascending=False)\
    .show()


In [None]:
owners.join(companies, owners.company_number == companies.company_number, how='left')\
    .groupby([companies.company_type, owners.officer_role])\
    .count()\
    .sort('count', ascending=False)\
    .show(truncate=False)

In [None]:
owners.groupBy('company_number')\
    .count()\
    .join(companies, 'company_number', how='left')\
    .select([
        'company_name',
        'count'
    ])\
    .sort('count', ascending=False)\
    .show(truncate=False)

In [None]:
filings.groupBy([
    'category',
    'description'
    ])\
    .count()\
    .sort('count', ascending=False)\
    .show(truncate=False)

In [None]:
filings.groupBy('company_number')\
    .sum('pages')\
    .join(companies, 'company_number', how='left')\
    .select([
        'company_name',
        'company_number',
        'sum(pages)'
    ])\
    .sort('sum(pages)', ascending=False)\
    .show(truncate=False)

In [None]:
incorporation_files = filings.join(companies, 'company_number', how='left')\
    .select(['company_name', 'date', 'incorporation_date'])\
    .withColumn('flag_incorporation_file', (col('date') == col('incorporation_date')).cast('int'))\
    .groupBy()\
    .sum('flag_incorporation_file').collect()[0][0]
incorporation_files / filings.count()

In [None]:
spark.stop()