Importing libraries

In [None]:
import findspark
findspark.init('/usr/spark2.4.3')

In [12]:
from pyspark.sql import SparkSession

In [13]:
ss = SparkSession.builder.appName("test").getOrCreate()

In [14]:
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.window import *
from pyspark.sql.types import *

In [15]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

Loading data

In [36]:
file_location = "/user/kolpurath6035/registered_companies.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = ss.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
#.show(n =5, truncate = False)


Preprocessing

In [9]:
# null values
column_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

rows = column_counts.collect()
for row in rows:
    for column, null_count in row.asDict().items():
        print(f"{column}: {null_count}")
    print()


CORPORATE_IDENTIFICATION_NUMBER: 0
COMPANY_NAME: 0
COMPANY_STATUS: 0
COMPANY_CLASS: 5078
COMPANY_CATEGORY: 5085
COMPANY_SUB_CATEGORY: 5090
date_of_registration: 2525
REGISTERED_STATE: 0
AUTHORIZED_CAP: 0
PAIDUP_CAPITAL: 2075
INDUSTRIAL_CLASS: 4811
PRINCIPAL_BUSINESS_ACTIVITY_AS_PER_CIN: 12
REGISTERED_OFFICE_ADDRESS: 15259
REGISTRAR_OF_COMPANIES: 42014
EMAIL_ADDR: 369507
latest_year_annual_return: 838571
LATEST_YEAR_FINANCIAL_STATEMENT: 827419



In [45]:
# data types of each columns
df.printSchema()

root
 |-- CORPORATE_IDENTIFICATION_NUMBER: string (nullable = true)
 |-- COMPANY_NAME: string (nullable = true)
 |-- COMPANY_STATUS: string (nullable = true)
 |-- COMPANY_CLASS: string (nullable = true)
 |-- COMPANY_CATEGORY: string (nullable = true)
 |-- COMPANY_SUB_CATEGORY: string (nullable = true)
 |-- date_of_registration: string (nullable = true)
 |-- REGISTERED_STATE: string (nullable = true)
 |-- AUTHORIZED_CAP: string (nullable = true)
 |-- PAIDUP_CAPITAL: integer (nullable = true)
 |-- INDUSTRIAL_CLASS: string (nullable = true)
 |-- PRINCIPAL_BUSINESS_ACTIVITY_AS_PER_CIN: string (nullable = true)
 |-- REGISTERED_OFFICE_ADDRESS: string (nullable = true)
 |-- REGISTRAR_OF_COMPANIES: string (nullable = true)
 |-- EMAIL_ADDR: string (nullable = true)
 |-- latest_year_annual_return: string (nullable = true)
 |-- LATEST_YEAR_FINANCIAL_STATEMENT: string (nullable = true)



In [37]:
# changing the data type of date
from pyspark.sql import functions as F

df = df.withColumn("date_of_registration",F.date_format(F.to_date("date_of_registration", "dd-MM-yyyy"), "yyyy-MM-dd"))
df = df.withColumn("latest_year_annual_return",F.date_format(F.to_date("latest_year_annual_return", "dd-MM-yyyy"), "yyyy-MM-dd"))

df = df.withColumn("PAIDUP_CAPITAL",df.PAIDUP_CAPITAL.cast(IntegerType())) # changing string to int
df.select("date_of_registration").printSchema()

root
 |-- date_of_registration: string (nullable = true)



In [46]:
# counting no. of rows
df.count()

1992170

In [50]:
# column names
df.columns

['CORPORATE_IDENTIFICATION_NUMBER',
 'COMPANY_NAME',
 'COMPANY_STATUS',
 'COMPANY_CLASS',
 'COMPANY_CATEGORY',
 'COMPANY_SUB_CATEGORY',
 'date_of_registration',
 'REGISTERED_STATE',
 'AUTHORIZED_CAP',
 'PAIDUP_CAPITAL',
 'INDUSTRIAL_CLASS',
 'PRINCIPAL_BUSINESS_ACTIVITY_AS_PER_CIN',
 'REGISTERED_OFFICE_ADDRESS',
 'REGISTRAR_OF_COMPANIES',
 'EMAIL_ADDR',
 'latest_year_annual_return',
 'LATEST_YEAR_FINANCIAL_STATEMENT']

In [53]:
df.describe('PAIDUP_CAPITAL','AUTHORIZED_CAP').show()

+-------+-------------------+--------------------+
|summary|     PAIDUP_CAPITAL|      AUTHORIZED_CAP|
+-------+-------------------+--------------------+
|  count|            1990095|             1992170|
|   mean|  6626710.304396021|4.2385082192709126E7|
| stddev|5.772859423538962E7|2.9605622147065825E9|
|    min|                  0|                 0.0|
|    max|         2146500000|      999999999990.0|
+-------+-------------------+--------------------+



1. Based on the sector display the number of companies 

In [21]:
# query 1
pandas_df = df.groupBy("COMPANY_CLASS").agg(count(col("CORPORATE_IDENTIFICATION_NUMBER")).alias("no_of_companies"))
pandas_df = pandas_df.dropna()
pandas_df.show()

+--------------------+---------------+
|       COMPANY_CLASS|no_of_companies|
+--------------------+---------------+
|              Public|         137612|
|             Private|        1819264|
|Private(One Perso...|          30216|
+--------------------+---------------+



In [10]:
pandas_df.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv("/user/kolpurath6035/viznu")

2. List the number of companies that had been registered in each decade

In [38]:
# query 2

dec = df.groupBy(year(col("DATE_OF_REGISTRATION")).alias("year")).agg(count(col("CORPORATE_IDENTIFICATION_NUMBER")).alias("count")).orderBy(year(col("DATE_OF_REGISTRATION")))

dec_df = dec.withColumn("decade", floor(col("year") / 10)*10)

cnt = dec_df.groupBy(col("decade")).agg(sum(col("count")).alias("count")).orderBy(col("decade"))

cnt.show()

+------+------+
|decade| count|
+------+------+
|  null|  2525|
|  1850|     1|
|  1860|     3|
|  1870|    24|
|  1880|    36|
|  1890|    57|
|  1900|   887|
|  1910|   934|
|  1920|  1827|
|  1930|  3378|
|  1940| 10808|
|  1950|  9475|
|  1960| 11818|
|  1970| 30666|
|  1980|143785|
|  1990|344661|
|  2000|439820|
|  2010|978716|
|  2020| 12747|
|  5600|     1|
+------+------+
only showing top 20 rows



In [14]:
cnt.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv("/user/kolpurath6035/viznu")

3. Find top 5 companies with highest paid up capital as a list in each leap year after 2000

In [39]:
# query 3

def checkLeap(y):
    if y is not None:
        y = int(y)
        if y >= 2000:
            if (((y%4 == 0) & (y%100 != 0)) | (y%400 == 0)):
                return True
            else:
                return False
        else:
            return False
    else:
        return False
    
leap =  udf(checkLeap, BooleanType())

leap_df = df.select(year("date_of_registration").alias("year"),"COMPANY_NAME","PAIDUP_CAPITAL").filter(leap(year(col("date_of_registration"))) == True)

leap_df = leap_df.withColumn("PAIDUP_CAPITAL",leap_df.PAIDUP_CAPITAL.cast(IntegerType())) # changing string to int

partition = Window.partitionBy("year").orderBy(desc("PAIDUP_CAPITAL"))
new_leap = leap_df.withColumn("rank", dense_rank().over(partition))

new_leap = new_leap.filter((col("rank").between(1,5))).orderBy("year","rank")
new_leap.show(2000)

+----+--------------------+--------------+----+
|year|        COMPANY_NAME|PAIDUP_CAPITAL|rank|
+----+--------------------+--------------+----+
|2000|RELOGISTICS INFRA...|    2101913000|   1|
|2000|NANDI ECONOMIC CO...|    2090466920|   2|
|2000|MW UNITEXX LIMITE...|    2087292760|   3|
|2000|NARAYANA HRUDAYAL...|    2043608040|   4|
|2000|PIPAVAV RAILWAY C...|    1960000200|   5|
|2004|EON HADAPSAR INFR...|    2131500000|   1|
|2004|SANYO BPL PRIVATE...|    2089333780|   2|
|2004|GKC PROJECTS LIMI...|    2050787020|   3|
|2004|FIRST FORGE LIMIT...|    2033333330|   4|
|2004|FIRESTAR INTERNAT...|    1946501420|   5|
|2008|BOSCH AUTOMOTIVE ...|    2130769230|   1|
|2008|COFFEE DAY ENTERP...|    2112517190|   2|
|2008|ASHLEY POWERTRAIN...|    2091004190|   3|
|2008|KERALA STATE INFO...|    2049592000|   4|
|2008|RAJAHMUNDRY GODAV...|    2039589000|   5|
|2012|NACHI TECHNOLOGY ...|    2100000000|   1|
|2012|HEXA INDUSTRIES P...|    2091810000|   2|
|2012|MYTRAH VAYU (MANJ...|    208450687

In [17]:
new_leap.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv("/user/kolpurath6035/viznu")

4. Find top 5 companies that has highest paid up capital in each state

In [25]:
# query 4

cmp = df.select("REGISTERED_STATE","COMPANY_NAME","PAIDUP_CAPITAL")
part = Window.partitionBy("REGISTERED_STATE").orderBy(desc("PAIDUP_CAPITAL"))

new_comp = cmp.withColumn("rank", dense_rank().over(part))
new_comp = new_comp.filter((col("rank").between(1,5)))

new_comp.show(2000)

+--------------------+--------------------+--------------+----+
|    REGISTERED_STATE|        COMPANY_NAME|PAIDUP_CAPITAL|rank|
+--------------------+--------------------+--------------+----+
|            Nagaland|NAGALAND PULP & P...|    1177500700|   1|
|            Nagaland|NAGALAND INDUSTRI...|     232001800|   2|
|            Nagaland|SANGRAHALAYA TIMB...|     200100000|   3|
|            Nagaland|HOTAHOTI WOOD PRO...|     199999600|   4|
|            Nagaland|SHYAMA POWER INDI...|     150761000|   5|
|           Karnataka|BOSCH AUTOMOTIVE ...|    2130769230|   1|
|           Karnataka|MODEL INFRA CORPO...|    2126211590|   2|
|           Karnataka|TECHNOTRENDS AUTO...|    2115100000|   3|
|           Karnataka|COFFEE DAY ENTERP...|    2112517190|   4|
|           Karnataka|DHRUVI SECURITIES...|    2100597940|   5|
|Dadra and Nagra H...|K-LIFESTYLE & IND...|    1017804000|   1|
|Dadra and Nagra H...|JBF INDUSTRIES LI...|     967789390|   2|
|Dadra and Nagra H...|DNH POWER DISTRIB.

In [46]:
new_comp.count()

195

In [20]:
# new_comp.rdd.repartition(1).orderBy("Registered_State").getNumPartitions()

In [None]:
new_comp.rdd.coalesce(1).toDF().write.option("header",True).mode("overwrite").csv("/user/kolpurath6035/viznu")

5. Which state has highest companies registered

In [27]:
# query 5
highest = df.groupBy("REGISTERED_STATE").agg(count(col("CORPORATE_IDENTIFICATION_NUMBER")).alias("no_of_companies")).orderBy(desc("no_of_companies"))
highest.show(1)

+----------------+---------------+
|REGISTERED_STATE|no_of_companies|
+----------------+---------------+
|     Maharashtra|         395282|
+----------------+---------------+
only showing top 1 row



In [24]:
highest.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv("/user/kolpurath6035/viznu")

6. Find the year on which each state has their maximum registration

In [28]:
# query 6

# df.groupBy("REGISTERED_STATE").agg(count(col("CORPORATE_IDENTIFICATION_NUMBER")).alias("count")).orderBy(desc("count")).show(1000)
date_df = df.select("CORPORATE_IDENTIFICATION_NUMBER",year("date_of_registration").alias("year"),"REGISTERED_STATE")
reg_count = date_df.groupBy("REGISTERED_STATE","year").agg(count(col("CORPORATE_IDENTIFICATION_NUMBER")).alias("count")).orderBy(desc("count"))

ptn = Window.partitionBy("REGISTERED_STATE").orderBy(desc("count"))
reg_count = reg_count.withColumn("rank", rank().over(ptn))
reg_count = reg_count.filter(col("rank") == 1).drop("rank")
reg_count.show(2000)

+--------------------+----+------+
|    REGISTERED_STATE|year| count|
+--------------------+----+------+
|            Nagaland|null|   620|
|           Karnataka|null|125779|
|Dadra and Nagra H...|null|   550|
|              Kerala|null| 56098|
|          Tamil Nadu|null|150871|
|      Andhra Pradesh|null| 33050|
|         Lakshadweep|null|    18|
|      Madhya Pradesh|null| 41318|
|              Punjab|null| 32440|
|             Manipur|null|  1056|
|                 Goa|null|  8942|
|             Mizoram|null|   175|
|    Himachal Pradesh|null|  6591|
|             Haryana|null| 51039|
|   Jammu and Kashmir|null|  5978|
|           Jharkhand|null| 15078|
|              Orissa|null| 27023|
|   Arunachal Pradesh|null|   629|
|             Gujarat|null|105554|
|         Uttaranchal|null|  8924|
|              Sikkim|null|     2|
|               Delhi|null|348230|
|          Chandigarh|null| 15018|
|         Pondicherry|null|  3454|
|Andaman and Nicob...|null|   480|
|           Rajastha

In [27]:
reg_count.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv("/user/kolpurath6035/viznu")

7. Find the sector that is most common in each state

In [29]:
# query 7

sec = df.groupBy("REGISTERED_STATE","COMPANY_CLASS").agg(count(col("COMPANY_CLASS")).alias("class_count")).orderBy("REGISTERED_STATE","COMPANY_CLASS")

sec = sec.filter(col("COMPANY_CLASS").isNotNull())

wind = Window.partitionBy("REGISTERED_STATE").orderBy(desc("class_count"))
sec = sec.withColumn("rank", rank().over(wind))
sec = sec.filter(col("rank") == 1).orderBy("REGISTERED_STATE").drop("rank") # take the max count of sector in each state; alphabetical order
sec.show()

+--------------------+-------------+-----------+
|    REGISTERED_STATE|COMPANY_CLASS|class_count|
+--------------------+-------------+-----------+
|Andaman and Nicob...|      Private|        460|
|      Andhra Pradesh|      Private|      30538|
|   Arunachal Pradesh|      Private|        582|
|               Assam|      Private|      10816|
|               Bihar|      Private|      32176|
|          Chandigarh|      Private|      13248|
|         Chattisgarh|      Private|       9835|
|Dadra and Nagra H...|      Private|        442|
|       Daman and Diu|      Private|        318|
|               Delhi|      Private|     322131|
|                 Goa|      Private|       8421|
|             Gujarat|      Private|      95834|
|             Haryana|      Private|      47457|
|    Himachal Pradesh|      Private|       5896|
|   Jammu and Kashmir|      Private|       5430|
|           Jharkhand|      Private|      13899|
|           Karnataka|      Private|     117309|
|              Keral

In [31]:
sec.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv("/user/kolpurath6035/viznu")

8. Based on sub_category give the count for companies in each state

In [30]:
# query 8

cat = df.groupBy("REGISTERED_STATE","COMPANY_SUB_CATEGORY").agg(count(col("COMPANY_SUB_CATEGORY")).alias("category_count")).orderBy("REGISTERED_STATE","COMPANY_SUB_CATEGORY")

cat = cat.filter(col("COMPANY_SUB_CATEGORY").isNotNull())
cat.show()

+--------------------+--------------------+--------------+
|    REGISTERED_STATE|COMPANY_SUB_CATEGORY|category_count|
+--------------------+--------------------+--------------+
|Andaman and Nicob...|    Non-govt company|           477|
|Andaman and Nicob...|  State Govt company|             2|
|Andaman and Nicob...|  Union Govt company|             1|
|      Andhra Pradesh|Guarantee and Ass...|             8|
|      Andhra Pradesh|    Non-govt company|         32742|
|      Andhra Pradesh|  State Govt company|            76|
|      Andhra Pradesh|Subsidiary of For...|           145|
|      Andhra Pradesh|  Union Govt company|             4|
|   Arunachal Pradesh|    Non-govt company|           618|
|   Arunachal Pradesh|  State Govt company|            11|
|               Assam|Guarantee and Ass...|             6|
|               Assam|    Non-govt company|         12004|
|               Assam|  State Govt company|            54|
|               Assam|Subsidiary of For...|             

In [33]:
cat.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv("/user/kolpurath6035/viznu")

9. List the companies that have been recently enrolled in each state

In [40]:
# query 9

rec_comp = df.select("REGISTERED_STATE","COMPANY_NAME","date_of_registration").filter(col("date_of_registration").isNotNull())
# df.select("date_of_registration").show(2000)
# rec_comp = rec_comp.filter(col("year").isNotNull())

# df.groupBy("REGISTERED_STATE")

wind = Window.partitionBy("REGISTERED_STATE").orderBy(desc("date_of_registration"))
rec_comp = rec_comp.withColumn("rank", dense_rank().over(wind))
rec_comp.filter(col("rank").between(1,5)).show(2000)

+--------------------+--------------------+--------------------+----+
|    REGISTERED_STATE|        COMPANY_NAME|date_of_registration|rank|
+--------------------+--------------------+--------------------+----+
|            Nagaland|JENELO FOODS PRIV...|          2020-01-30|   1|
|            Nagaland|KAPUNU HOME (OPC)...|          2020-01-13|   2|
|            Nagaland|NUH FUTURISTIC PR...|          2020-01-07|   3|
|            Nagaland|AWESOME BUTCHER P...|          2020-01-07|   3|
|            Nagaland|ONGBOU MARKETING ...|          2019-12-18|   4|
|            Nagaland|SEITHOGEI FOUNDAT...|          2019-12-06|   5|
|           Karnataka|RAIVO INFRA PRIVA...|          2020-01-31|   1|
|           Karnataka|PROPOFF MANAGEMEN...|          2020-01-31|   1|
|           Karnataka|GINEE MARKETING A...|          2020-01-31|   1|
|           Karnataka|APHELION HEALTH P...|          2020-01-31|   1|
|           Karnataka|KBN SERVICES PRIV...|          2020-01-31|   1|
|           Karnatak

In [45]:
rec_comp.count()

1989645

In [36]:
# rec_comp.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv("/user/kolpurath6035/viznu")

10. Find the count of companies per company_status

In [41]:
# query 10

comp_count = df.select("CORPORATE_IDENTIFICATION_NUMBER","COMPANY_STATUS") \
    .groupBy("COMPANY_STATUS").agg(count(col("CORPORATE_IDENTIFICATION_NUMBER")).alias("count")).orderBy(desc("count"))

In [42]:
comp_count.show(2000)

+--------------+-------+
|COMPANY_STATUS|  count|
+--------------+-------+
|          ACTV|1190101|
|          STOF| 688886|
|          UPSO|  41457|
|          AMAL|  24893|
|          CLLP|  13175|
|          DISD|   9769|
|          NAEF|   9286|
|          ULQD|   6460|
|          CLLD|   4874|
|          D455|   2145|
|          LIQD|   1121|
|          DRMT|      2|
|          MLIQ|      1|
+--------------+-------+



In [38]:
comp_count.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv("/user/kolpurath6035/viznu")

11. Find the top 2 companies per principal business activity in 19th century

In [43]:
# query 11

buis = df.filter(year("date_of_registration").between(1800,1899))


buis = buis.select("PRINCIPAL_BUSINESS_ACTIVITY_AS_PER_CIN","COMPANY_NAME","PAIDUP_CAPITAL")
pt = Window.partitionBy("PRINCIPAL_BUSINESS_ACTIVITY_AS_PER_CIN").orderBy(desc("PAIDUP_CAPITAL"))

buis = buis.withColumn("rank", rank().over(pt)) \
        .filter(col("rank").between(1,2))\
        .drop("rank") 

buis.show() 

+--------------------------------------+--------------------+--------------+
|PRINCIPAL_BUSINESS_ACTIVITY_AS_PER_CIN|        COMPANY_NAME|PAIDUP_CAPITAL|
+--------------------------------------+--------------------+--------------+
|                  Agriculture & allied|NEW CHUMTA TEA CO...|      21400000|
|                  Agriculture & allied|NORTHERN BENGAL T...|      20506000|
|                  Wholesale and ret...|HOWRAH MILLS CO L...|      64443070|
|                  Wholesale and ret...|ALLIANCE UDYOG LT...|       6000000|
|                          Construction|AHMEDNAGAR IMARAT...|        500000|
|                          Construction|BACHITTER SINGH A...|        414000|
|                  Other community s...| WHELER CLUB LTD.   |       2130021|
|                  Other community s...|  OOTACAMUND CLUB   |             0|
|                  Other community s...|OOTACAMUND GYMKHA...|             0|
|                  Other community s...|     MALABAR CLUB   |             0|

In [40]:
buis.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv("/user/kolpurath6035/viznu")

12. Find the company with higest paidup capital in each decade

In [44]:
# query 12

p_cap = df.select("COMPANY_NAME", (floor(year("date_of_registration")/10)*10).alias("decade"),"PAIDUP_CAPITAL").orderBy(desc("decade"))

w = Window.partitionBy("decade").orderBy(desc("PAIDUP_CAPITAL"))

p_cap = p_cap.withColumn("rank", rank().over(w)) \
    .filter(col("rank") == 1).orderBy("decade") \
    .drop("rank")
    
p_cap = p_cap.select("decade","COMPANY_NAME","PAIDUP_CAPITAL").filter(col("decade").isNotNull())
p_cap.show()

+------+--------------------+--------------+
|decade|        COMPANY_NAME|PAIDUP_CAPITAL|
+------+--------------------+--------------+
|  1850|JHUNJHUNU COMMERC...|          2000|
|  1860|BOMBAY BURMAH TRA...|     139543800|
|  1870|PENINSULA LAND LI...|     559000000|
|  1880|    D C M LIMITED   |     186746315|
|  1890|CENTURY TEXTILES ...|    1116900000|
|  1900|THE INDIAN HOTELS...|    1189258445|
|  1910|THE KARUR VYSYA B...|    1598641438|
|  1920|THE SOUTH INDIAN ...|    1809722151|
|  1930|SHREENIWAS COTTON...|    1986166600|
|  1940|    J C T LIMITED   |    2096066470|
|  1950|MAWMLUH-CHERRA CE...|    1628285140|
|  1960|RANBAXY LABORATOR...|    2128317485|
|  1970|KERALA STATE ELEC...|    2035518100|
|  1980|AVINASH BHOSALE I...|    2146500000|
|  1990|SONY DADC MANUFAC...|    2142858680|
|  2000|BOI AXA INVESTMEN...|    2146035840|
|  2010|SBESS SERVICES PR...|    2145926750|
|  2020|ATLBATTERY TECHNO...|    1793500000|
|  5600|WHITTLE SPINNING ...|             0|
|  9070|95

In [42]:
p_cap.rdd.coalesce(1).toDF().write.option("header",True).mode("append").csv("/user/kolpurath6035/viznu")