## Import of libraries & dataset

### Import Libraries

In [1]:
!pip install pyspark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, isnull, col, sum, when, input_file_name, udf, to_timestamp, datediff
from pyspark.ml.feature import Bucketizer
import pandas as pd
import numpy as np
from pandas import Series
from numpy import var, mean, sqrt
import seaborn as sb
import matplotlib.pyplot as plt



In [2]:
# Define a schema for the CSV files
schema = StructType([
    StructField("ddl_case_id object", StringType(), True),
    StructField("year", StringType(), True),
    StructField("state_code", StringType(), True),
    StructField("dist_code", StringType(), True),
    StructField("court_no", StringType(), True),
    StructField("cino", StringType(), True),
    StructField("judge_position", StringType(), True),
    StructField("female_defendant", StringType(), True),
    StructField("female_petitioner", StringType(), True),
    StructField("female_adv_def", StringType(), True),
    StructField("female_adv_pet", StringType(), True),
    StructField("type_name", IntegerType(), True),
    StructField("purpose_name", StringType(), True),
    StructField("disp_name", StringType(), True),
    StructField("date_of_filing", StringType(), True),
    StructField("date_of_decision", StringType(), True),
    StructField("date_first_list", StringType(), True),
    StructField("date_last_list", StringType(), True),
    StructField("date_next_list", StringType(), True),
    StructField("act", StringType(), True),
    StructField("section", StringType(), True),
    StructField("billable_ipc", StringType(), True),
    StructField("number_sections_ipc", StringType(), True),
    StructField("criminal", StringType(), True),
    StructField("ddl_filing_judge_id", StringType(), True),
    StructField("ddl_decision_judge_id", StringType(), True),
    StructField("female_judge_filing", StringType(), True),
    StructField("start_date_filing", StringType(), True),
    StructField("end_date_filing", StringType(), True),
    StructField("female_judge_decision", StringType(), True),
    StructField("start_date_decision", StringType(), True),
    StructField("end_date_decision", StringType(), True),
    StructField("judgediff", StringType(), True),
    StructField("pendency_decision_filing", StringType(), True),
    StructField("pendency_first_filing", StringType(), True),
    StructField("pendency_decision_first", StringType(), True),
    StructField("pendency_next_last", StringType(), True),
    StructField("pendency_decision_last", StringType(), True),
    StructField("pendency_decision_next", StringType(), True),
    StructField("pendency_decision_filing_categories", StringType(), True),
    StructField("pendency_first_filing_categories", StringType(), True),
    StructField("filing_judge_tenure",StringType(), True),
    StructField("decision_judge_tenure", StringType(), True),
])

### Import of Cases 2010

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
MAX_MEMORY = '45G'
conf = pyspark.SparkConf().setMaster("local[*]") \
        .set('spark.executor.heartbeatInterval', 10000) \
        .set('spark.network.timeout', 10000) \
        .set("spark.core.connection.ack.wait.timeout", "3600") \
        .set("spark.executor.memory", MAX_MEMORY) \
        .set("spark.driver.memory", MAX_MEMORY)

# Create a Spark session
spark = SparkSession.builder.appName("CSV Addition").config(conf=conf).getOrCreate()

In [5]:

# Define a list of file paths for the CSV files
file_paths = ['/content/drive/MyDrive/Law/cases/2010/cases_2010_final.csv',
              '/content/drive/MyDrive/Law/cases/2011/cases_2011_final.csv',
              '/content/drive/MyDrive/Law/cases/2012/cases_2012_final.csv',
              '/content/drive/MyDrive/Law/cases/2013/cases_2013_final.csv',
              '/content/drive/MyDrive/Law/cases/2014/cases_2014_final.csv',
              '/content/drive/MyDrive/Law/cases/2015/cases_2015_final.csv',
              '/content/drive/MyDrive/Law/cases/2016/cases_2016_final.csv',
              '/content/drive/MyDrive/Law/cases/2017/cases_2017_final.csv',
              '/content/drive/MyDrive/Law/cases/2018/cases_2018_final.csv']

# Read the CSV files into a single dataframe
df_cases_orig = spark.read.csv(
    file_paths,
    schema=schema,
    header=True, # set to False if there's no header
    timestampFormat="yyyy-MM-dd'T'HH:mm:ss", # specify the format for timestamp columns
    #mode="FAILFAST" # fail fast if any errors occur while reading the CSV files
    mode="PERMISIVE" # fail fast if any errors occur while reading the CSV files
)

# Display the combined dataframe
df_cases_orig.show(5)

+--------------------+----+----------+---------+--------+----------------+--------------------+----------------+-----------------+--------------+--------------+---------+------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+---------+------------+-------------------+--------+-------------------+---------------------+-------------------+-----------------+---------------+---------------------+-------------------+-----------------+---------+------------------------+---------------------+-----------------------+------------------+----------------------+----------------------+-----------------------------------+--------------------------------+-------------------+---------------------+
|  ddl_case_id object|year|state_code|dist_code|court_no|            cino|      judge_position|female_defendant|female_petitioner|female_adv_def|female_adv_pet|type_name|purpose_name|disp_name|      date_of_filing|    date_of_de

In [6]:
# Define a list of file paths for the CSV files
file_paths = ['/content/drive/MyDrive/Law/religion/cases_religion_2018.csv',
              '/content/drive/MyDrive/Law/religion/cases_religion_2017.csv',
              '/content/drive/MyDrive/Law/religion/cases_religion_2016.csv',
              '/content/drive/MyDrive/Law/religion/cases_religion_2015.csv',
              '/content/drive/MyDrive/Law/religion/cases_religion_2014.csv',
              '/content/drive/MyDrive/Law/religion/cases_religion_2013.csv',
              '/content/drive/MyDrive/Law/religion/cases_religion_2012.csv',
              '/content/drive/MyDrive/Law/religion/cases_religion_2011.csv',
              '/content/drive/MyDrive/Law/religion/cases_religion_2010.csv']

# Read the CSV files into a single dataframe
df_cases_orig_religion = spark.read.csv(
    file_paths,
    #schema=schema,
    header=True, # set to False if there's no header
    #timestampFormat="yyyy-MM-dd'T'HH:mm:ss", # specify the format for timestamp columns
    #mode="FAILFAST" # fail fast if any errors occur while reading the CSV files
    mode="PERMISIVE" # fail fast if any errors occur while reading the CSV files
)

# Display the combined dataframe
df_cases_orig_religion.show(5)

+--------------------+----------------+-----------------+--------------+--------------+
|         ddl_case_id|muslim_defendant|muslim_petitioner|muslim_adv_def|muslim_adv_pet|
+--------------------+----------------+-----------------+--------------+--------------+
|01-01-01-20190000...|     0 nonmuslim|      0 nonmuslim|         -9999|             0|
|01-01-01-20190000...|     0 nonmuslim|      0 nonmuslim|         -9999|             0|
|01-01-01-20190000...|     0 nonmuslim|      0 nonmuslim|         -9999|             0|
|01-01-01-20190000...|     0 nonmuslim|      0 nonmuslim|         -9999|             0|
|01-01-01-20190000...|   -9998 unclear|      0 nonmuslim|         -9999|             0|
+--------------------+----------------+-----------------+--------------+--------------+
only showing top 5 rows



In [7]:
df_cases_orig =  df_cases_orig.withColumn("act", col("act").cast("integer"))
df_cases_orig =  df_cases_orig.withColumn("section", col("section").cast("integer"))
#column_name ="section"
#null_count = df_cases_orig.select(sum(col(column_name).isNull().cast("int"))).collect()[0][0]
#print(null_count)

In [8]:
total_rows = df_cases_orig.count()
print(total_rows)

79313266


In [9]:
total_rows_religion = df_cases_orig_religion.count()
print(total_rows_religion)

80935944


In [10]:
df_cases_orig.dtypes

[('ddl_case_id object', 'string'),
 ('year', 'string'),
 ('state_code', 'string'),
 ('dist_code', 'string'),
 ('court_no', 'string'),
 ('cino', 'string'),
 ('judge_position', 'string'),
 ('female_defendant', 'string'),
 ('female_petitioner', 'string'),
 ('female_adv_def', 'string'),
 ('female_adv_pet', 'string'),
 ('type_name', 'int'),
 ('purpose_name', 'string'),
 ('disp_name', 'string'),
 ('date_of_filing', 'string'),
 ('date_of_decision', 'string'),
 ('date_first_list', 'string'),
 ('date_last_list', 'string'),
 ('date_next_list', 'string'),
 ('act', 'int'),
 ('section', 'int'),
 ('billable_ipc', 'string'),
 ('number_sections_ipc', 'string'),
 ('criminal', 'string'),
 ('ddl_filing_judge_id', 'string'),
 ('ddl_decision_judge_id', 'string'),
 ('female_judge_filing', 'string'),
 ('start_date_filing', 'string'),
 ('end_date_filing', 'string'),
 ('female_judge_decision', 'string'),
 ('start_date_decision', 'string'),
 ('end_date_decision', 'string'),
 ('judgediff', 'string'),
 ('pendency

In [11]:
old_header_name = "ddl_case_id object"
new_header_name = "ddl_case_id"

df_cases_orig = df_cases_orig.withColumnRenamed(old_header_name, new_header_name)

In [12]:
df_cases_orig  = df_cases_orig.join(df_cases_orig_religion, on="ddl_case_id", how="left")

In [13]:
df_cases_orig.show()

+--------------------+----+----------+---------+--------+----------------+--------------------+----------------+-----------------+--------------+--------------+---------+------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+-------+------------+-------------------+--------+-------------------+---------------------+-------------------+-----------------+---------------+---------------------+-------------------+-----------------+---------+------------------------+---------------------+-----------------------+------------------+----------------------+----------------------+-----------------------------------+--------------------------------+-------------------+---------------------+----------------+-----------------+--------------+--------------+
|         ddl_case_id|year|state_code|dist_code|court_no|            cino|      judge_position|female_defendant|female_petitioner|female_adv_def|female_adv_pet|type_na

## Preprocessing & Feature Engineering

In [14]:
df_cases_orig.printSchema()

root
 |-- ddl_case_id: string (nullable = true)
 |-- year: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- dist_code: string (nullable = true)
 |-- court_no: string (nullable = true)
 |-- cino: string (nullable = true)
 |-- judge_position: string (nullable = true)
 |-- female_defendant: string (nullable = true)
 |-- female_petitioner: string (nullable = true)
 |-- female_adv_def: string (nullable = true)
 |-- female_adv_pet: string (nullable = true)
 |-- type_name: integer (nullable = true)
 |-- purpose_name: string (nullable = true)
 |-- disp_name: string (nullable = true)
 |-- date_of_filing: string (nullable = true)
 |-- date_of_decision: string (nullable = true)
 |-- date_first_list: string (nullable = true)
 |-- date_last_list: string (nullable = true)
 |-- date_next_list: string (nullable = true)
 |-- act: integer (nullable = true)
 |-- section: integer (nullable = true)
 |-- billable_ipc: string (nullable = true)
 |-- number_sections_ipc: string (nullable

In [15]:
from pyspark.sql import functions as F

# Assuming df_cases_orig is your Spark DataFrame
df_cases_orig = df_cases_orig.withColumn("court_details", F.concat_ws("_", "state_code", "dist_code", "court_no"))
df_cases_orig = df_cases_orig.withColumn("state_district", F.concat_ws("_", "state_code", "dist_code"))

Filter all cases where decision is available (not required)

In [None]:
#df_cases_orig  = df_cases_orig [df_cases_orig ['pendency_decision_filing_categories'] != 'Undecided']

In [16]:
#print(df_cases_orig.count())
# filter all cases where the case is Undecided and disp_name is not 26
rows_to_remove_mask = (
    (df_cases_orig["pendency_decision_filing_categories"] == "Undecided") &
    ((df_cases_orig["disp_name"] != "26") & (df_cases_orig["disp_name"] != "27"))
)

# Create a copy of the original DataFrame and drop the rows to be removed
df_cases_orig = df_cases_orig[~rows_to_remove_mask]
# Create a copy of the original DataFrame and drop the rows to be removed
#print(df_cases_orig.count())

In [17]:
# create columns for binary classification problem
mapping_multi = {'upto 1 year': 'ontime' ,'1 to 2 years': 'ontime', '2 to 3 years': 'delayed','3 to 4 years': 'delayed','4 to 5 years': 'delayed', '5 to 6 years': 'delayed','6 to 7 years': 'delayed', '7 to 8 years': 'delayed','8 to 9 years': 'delayed', '9 to 10 years': 'delayed',  '10 to 11 years': 'delayed', 'Undecided': 'delayed'}
def map_decision(decision):
    return mapping_multi.get(decision, None)

# Register the UDF
map_decision_udf = F.udf(map_decision)

# Use the UDF to create a new column "pendency_model_multiyear"
df_cases_orig = df_cases_orig.withColumn("pendency_model_multiyear",
                                         map_decision_udf("pendency_decision_filing_categories"))

In [18]:
# Assuming you have a Spark DataFrame called 'df_cases2010'
df_cases_orig = df_cases_orig.withColumn("start_date_filing", to_timestamp("start_date_filing", 'yyyy-MM-dd'))
df_cases_orig = df_cases_orig.withColumn("date_of_filing", to_timestamp("date_of_filing", "yyyy-MM-dd HH:mm:ssXXX"))

df_cases_orig = df_cases_orig.withColumn("filing_judge_tenure_at_filing", when(col("start_date_filing").isNull() | col("date_of_filing").isNull(), None)
                                        .otherwise(
                                            (datediff(col("date_of_filing"), col("start_date_filing"))/30).cast(IntegerType())
                                        ))

In [19]:
unique_jt_values = df_cases_orig.select("filing_judge_tenure_at_filing").distinct()
#totcount = unique_jt_values.count()
#unique_jt_values.show(totcount, truncate=False)

In [20]:
# Assuming you have a Spark DataFrame called 'df_cases2010'
df_cases_orig = df_cases_orig.withColumn("start_date_decision", to_timestamp("start_date_decision", 'yyyy-MM-dd'))
df_cases_orig = df_cases_orig.withColumn("date_of_decision", to_timestamp("date_of_decision", "yyyy-MM-dd HH:mm:ssXXX"))

df_cases_orig = df_cases_orig.withColumn("decision_judge_tenure_at_decision", when(col("start_date_decision").isNull() | col("date_of_decision").isNull(), None)
                                        .otherwise(
                                            (datediff(col("date_of_decision"), col("start_date_decision"))/30).cast(IntegerType())
                                        ))

In [21]:
unique_jt_values = df_cases_orig.select("decision_judge_tenure_at_decision").distinct()
totcount = unique_jt_values.count()
unique_jt_values.show(totcount, truncate=False)

+---------------------------------+
|decision_judge_tenure_at_decision|
+---------------------------------+
|31                               |
|85                               |
|65                               |
|53                               |
|133                              |
|78                               |
|108                              |
|34                               |
|115                              |
|101                              |
|126                              |
|81                               |
|28                               |
|76                               |
|26                               |
|27                               |
|44                               |
|103                              |
|12                               |
|223                              |
|91                               |
|222                              |
|22                               |
|128                              |
|122                        

In [22]:
print(df_cases_orig.count())

79293276


In [23]:
print(df_cases_orig.select("ddl_case_id").distinct().count())


79293276


We will drop following features

* ddl_filing_judge_id
* ddl_decision_judge_id
* start_date_filing
* end_date_filing
* start_date_decision
* end_date_decision

The following are dropped as these dates are converted into categorical variables as periods

* disp_name

The following are dropped as they are just identifiers

* ddl_case_id object
* purpose_name

These variables are not included as deemed not important*

* pendency_decision_first
* pendency_decision_last
* pendency_decision_next
* pendency_next_last

Drop these as these are feature engineered for target variables

* pendency_decision_filing_categories

Drop these as these are feature engineered and used

* dist_code
* court_no


In [24]:
#columns_to_drop = ['ddl_filing_judge_id','ddl_decision_judge_id','start_date_filing','end_date_filing','start_date_decision','end_date_decision',\
#                   'ddl_case_id','purpose_name','date_first_list','date_last_list','date_next_list',\
#                   'pendency_decision_first','pendency_decision_last', 'pendency_decision_next','pendency_next_last',\
#                   'dist_code','court_no']

columns_to_drop = ['ddl_filing_judge_id','ddl_decision_judge_id','start_date_filing','end_date_filing','start_date_decision','end_date_decision',\
                   'purpose_name','date_first_list','date_last_list','date_next_list',\
                   'pendency_decision_first','pendency_decision_last', 'pendency_decision_next','pendency_next_last',\
                   'dist_code','court_no']


# Create a copy of the dataframe and start preparing the same for prediction modelling
df_cases_modified_merge = df_cases_orig.drop(*columns_to_drop)

In [25]:
df_cases_modified_merge.printSchema()

root
 |-- ddl_case_id: string (nullable = true)
 |-- year: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- cino: string (nullable = true)
 |-- judge_position: string (nullable = true)
 |-- female_defendant: string (nullable = true)
 |-- female_petitioner: string (nullable = true)
 |-- female_adv_def: string (nullable = true)
 |-- female_adv_pet: string (nullable = true)
 |-- type_name: integer (nullable = true)
 |-- disp_name: string (nullable = true)
 |-- date_of_filing: timestamp (nullable = true)
 |-- date_of_decision: timestamp (nullable = true)
 |-- act: integer (nullable = true)
 |-- section: integer (nullable = true)
 |-- billable_ipc: string (nullable = true)
 |-- number_sections_ipc: string (nullable = true)
 |-- criminal: string (nullable = true)
 |-- female_judge_filing: string (nullable = true)
 |-- female_judge_decision: string (nullable = true)
 |-- judgediff: string (nullable = true)
 |-- pendency_decision_filing: string (nullable = true)
 |-- pen

## Merge data from modified Act, Section and Judge Position File

In [26]:
df_act = spark.read.csv("/content/drive/MyDrive/Law/keys/act_key.csv", header=True, inferSchema=True)
df_section = spark.read.csv("/content/drive/MyDrive/Law/keys/section_key.csv", header=True, inferSchema=True)
df_type = spark.read.csv("/content/drive/MyDrive/Law/keys/type_name_key.csv", header=True, inferSchema=True)

In [27]:
df_act  = df_act.withColumnRenamed("count", "countact")
df_act = df_act.dropna(how='all')
df_act.show(5)


+--------------------+--------+----+
|               act_s|countact| act|
+--------------------+--------+----+
|                NULL| 11282.0|NULL|
|                   '| 11160.0|   1|
|                  ''| 20887.0|   2|
|'Tamil Nadu City ...|   371.0|   3|
|(BOMBAY) NATIONAL...|     2.0|   4|
+--------------------+--------+----+
only showing top 5 rows



In [28]:
df_act.printSchema()

root
 |-- act_s: string (nullable = true)
 |-- countact: double (nullable = true)
 |-- act: integer (nullable = true)



In [29]:
df_section=  df_section.withColumnRenamed("count", "countsection")
df_section = df_section.dropna(how='all')
df_section.show(5)

+--------------------+------------+-------+
|           section_s|countsection|section|
+--------------------+------------+-------+
|                NULL|     4834909|   NULL|
|!#11bofHindumarri...|           1|      1|
|              !3 (B)|           1|      2|
|          !3(1)(i-a)|           1|      3|
|               !3(3)|           1|      4|
+--------------------+------------+-------+
only showing top 5 rows



In [30]:
df_section.printSchema()

root
 |-- section_s: string (nullable = true)
 |-- countsection: integer (nullable = true)
 |-- section: integer (nullable = true)



In [31]:
df_type = df_type.drop("count")
df_type.show(5)

+----+---------+------------------+
|year|type_name|       type_name_s|
+----+---------+------------------+
|2010|        1|          (m) t.s.|
|2010|        2|              (sc)|
|2010|        3|        ..mact 166|
|2010|        4|04 complaint cases|
|2010|        5|  04 criminal case|
+----+---------+------------------+
only showing top 5 rows



In [32]:
df_cases_modified_mergedact  = df_cases_modified_merge.join(df_act, on="act", how="left")

In [33]:
df_cases_modified_mergedact.show(5)

+-----+--------------------+----+----------+----------------+--------------------+----------------+-----------------+--------------+--------------+---------+---------+-------------------+-------------------+-------+------------+-------------------+--------+-------------------+---------------------+---------+------------------------+---------------------+-----------------------------------+--------------------------------+-------------------+---------------------+----------------+-----------------+--------------+--------------+-------------+--------------+------------------------+-----------------------------+---------------------------------+--------------------+---------+
|  act|         ddl_case_id|year|state_code|            cino|      judge_position|female_defendant|female_petitioner|female_adv_def|female_adv_pet|type_name|disp_name|     date_of_filing|   date_of_decision|section|billable_ipc|number_sections_ipc|criminal|female_judge_filing|female_judge_decision|judgediff|pendency_

In [34]:
print(df_cases_modified_mergedact.count())

79293276


In [35]:
df_cases_modified_mergedactsection  = df_cases_modified_mergedact.join(df_section,on="section", how="left")

In [36]:
df_cases_modified_mergedactsection.show(5)

+-------+-----+--------------------+----+----------+----------------+--------------------+----------------+-----------------+--------------+--------------+---------+---------+-------------------+-------------------+------------+-------------------+--------+-------------------+---------------------+---------+------------------------+---------------------+-----------------------------------+--------------------------------+-------------------+---------------------+----------------+-----------------+--------------+--------------+-------------+--------------+------------------------+-----------------------------+---------------------------------+--------------------+--------+--------------------+------------+
|section|  act|         ddl_case_id|year|state_code|            cino|      judge_position|female_defendant|female_petitioner|female_adv_def|female_adv_pet|type_name|disp_name|     date_of_filing|   date_of_decision|billable_ipc|number_sections_ipc|criminal|female_judge_filing|female_j

In [37]:
df_cases_modified_mergedactsectiontype = df_cases_modified_mergedactsection.join(df_type, on = ["year","type_name"],  how="left")

In [38]:
print(df_cases_modified_mergedactsection.count())

79293276


In [39]:
df_judge = spark.read.csv("/content/drive/MyDrive/Law/judges_clean/judge_position_key.csv", header=True, inferSchema=True)

In [40]:
def merge_judge_position(df):
    df  = df.join(df_judge,on="judge_position", how="left")
    df  = df.drop("judge_position")

    return df

In [41]:
df_cases_modified_mergedactsectionjp = merge_judge_position(df_cases_modified_mergedactsectiontype)

In [42]:
df_cases_modified_mergedactsectionjp.printSchema()

root
 |-- year: string (nullable = true)
 |-- type_name: integer (nullable = true)
 |-- section: integer (nullable = true)
 |-- act: integer (nullable = true)
 |-- ddl_case_id: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- cino: string (nullable = true)
 |-- female_defendant: string (nullable = true)
 |-- female_petitioner: string (nullable = true)
 |-- female_adv_def: string (nullable = true)
 |-- female_adv_pet: string (nullable = true)
 |-- disp_name: string (nullable = true)
 |-- date_of_filing: timestamp (nullable = true)
 |-- date_of_decision: timestamp (nullable = true)
 |-- billable_ipc: string (nullable = true)
 |-- number_sections_ipc: string (nullable = true)
 |-- criminal: string (nullable = true)
 |-- female_judge_filing: string (nullable = true)
 |-- female_judge_decision: string (nullable = true)
 |-- judgediff: string (nullable = true)
 |-- pendency_decision_filing: string (nullable = true)
 |-- pendency_first_filing: string (nullable = true)
 

In [43]:
#Transform number_sections_ipc
from pyspark.ml.feature import Bucketizer
from pyspark.sql.types import DoubleType
bins = [-1, 1, 2, 3, 4, 5, 10, 15, 20, 25, 30, 35, 40, float('inf')]
labelsipc = ['zero', 'one', 'two', 'three', 'four', '5 to 10', '10 to 15', '15 to 20',
          '20 to 25', '25 to 30', '30 to 35', '35 to 40', '40 plus']

# Define a UDF to map indices to labels
def map_index_to_label_ipc(index):
    if index is not None:
        return labelsipc[index]
    else:
        return "NA"

# Register the UDF
map_index_udf_ipc = udf(map_index_to_label_ipc, StringType())

# Assuming 'pendency_decision_filing' is the column you want to bin
# You can also cast it to DoubleType to match the bin boundaries
df_cases_modified_mergedactsectionjp = df_cases_modified_mergedactsectionjp.withColumn("number_sections_ipc", col("number_sections_ipc").cast(DoubleType()))

# Use Bucketizer to create a new column with the labels
bucketizer = Bucketizer(splits=bins, inputCol="number_sections_ipc", outputCol="number_sections_ipc_temp")
df_cases_modified_mergedactsectionjp = bucketizer.transform(df_cases_modified_mergedactsectionjp)

df_cases_modified_mergedactsectionjp = df_cases_modified_mergedactsectionjp.withColumn("number_sections_ipc_temp", col("number_sections_ipc_temp").cast(IntegerType()))

# Add a new column "label_values" to the DataFrame using the UDF
df_cases_modified_mergedactsectionjp = df_cases_modified_mergedactsectionjp.withColumn("number_sections_ipc_category", map_index_udf_ipc(df_cases_modified_mergedactsectionjp["number_sections_ipc_temp"]))

In [44]:
df_cases_sample = df_cases_modified_mergedactsectionjp.select("number_sections_ipc", "number_sections_ipc_category")
df_cases_sample = df_cases_sample.filter(~(isnan(col("number_sections_ipc")) | isnull(col("number_sections_ipc"))))
df_cases_sample.show()

+-------------------+----------------------------+
|number_sections_ipc|number_sections_ipc_category|
+-------------------+----------------------------+
|                3.0|                       three|
|                5.0|                     5 to 10|
|                2.0|                         two|
|                1.0|                         one|
|                1.0|                         one|
|                2.0|                         two|
|                4.0|                        four|
|                2.0|                         two|
|                1.0|                         one|
|                1.0|                         one|
|                1.0|                         one|
|                1.0|                         one|
|                1.0|                         one|
|                1.0|                         one|
|                1.0|                         one|
|                1.0|                         one|
|                1.0|          

In [45]:
#Transform pendency
bins = [-1, 0, 1, 3, 6, 9, 12, 24, 36, 48, 60, 72, 84, 96, 108, 120, float('inf')]
labelspf = ['upto 1 month', '1 month', '1 to 3 months', '3 to 6 months', '6 month to 9 month' , '9 month to 1 year', '1 to 2 years', '2 to 3 years', '3 to 4 years', '4 to 5 years',
          '5 to 6 years', '6 to 7 years', '7 to 8 years', '8 to 9 years', '9 to 10 years',
          '10 year +']

# Define a UDF to map indices to labels
def map_index_to_label_pf(index):
    if index is not None:
        return labelspf[index]
    else:
        return "NA"

# Register the UDF
map_index_udf_pf = udf(map_index_to_label_pf, StringType())

# Assuming 'pendency_decision_filing' is the column you want to bin
# You can also cast it to DoubleType to match the bin boundaries
df_cases_modified_mergedactsectionjp = df_cases_modified_mergedactsectionjp.withColumn("pendency_first_filing", col("pendency_first_filing").cast(DoubleType()))

# Use Bucketizer to create a new column with the labels
bucketizer = Bucketizer(splits=bins, inputCol="pendency_first_filing", outputCol="pendency_first_filing_temp")
df_cases_modified_mergedactsectionjp = bucketizer.transform(df_cases_modified_mergedactsectionjp)

df_cases_modified_mergedactsectionjp = df_cases_modified_mergedactsectionjp.withColumn("pendency_first_filing_temp", col("pendency_first_filing_temp").cast(IntegerType()))

# Add a new column "label_values" to the DataFrame using the UDF
df_cases_modified_mergedactsectionjp = df_cases_modified_mergedactsectionjp.withColumn("pendency_first_filing_category", map_index_udf_pf(df_cases_modified_mergedactsectionjp["pendency_first_filing_temp"]))


In [46]:
df_cases_sample = df_cases_modified_mergedactsectionjp.select("pendency_first_filing", "pendency_first_filing_temp", "pendency_first_filing_category")
df_cases_sample = df_cases_sample.filter(~(isnan(col("pendency_first_filing")) | isnull(col("pendency_first_filing"))))
df_cases_sample.show()

+---------------------+--------------------------+------------------------------+
|pendency_first_filing|pendency_first_filing_temp|pendency_first_filing_category|
+---------------------+--------------------------+------------------------------+
|                  3.0|                         3|                 3 to 6 months|
|                  1.0|                         2|                 1 to 3 months|
|                 11.0|                         5|             9 month to 1 year|
|                  6.0|                         4|            6 month to 9 month|
|                  0.0|                         1|                       1 month|
|                 52.0|                         9|                  4 to 5 years|
|                  2.0|                         2|                 1 to 3 months|
|                  1.0|                         2|                 1 to 3 months|
|                  0.0|                         1|                       1 month|
|               

In [47]:
#Transform filling_judge_tenure_at_filing
from pyspark.ml.feature import Bucketizer
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, expr
bins = [-1, 1, 3, 6, 12, 24, 36, 48, 60, 72, 84, 96, 108, 120, 180, float('inf')]
labels = ['upto 1 months', '1 to 3 months', '3 to 6 months', '6 month to 1 year', '1 to 2 years', '2 to 3 years', '3 to 4 years', '4 to 5 years',
          '5 to 6 years', '6 to 7 years', '7 to 8 years', '8 to 9 years', '9 to 10 years',
          '10 to 15 years', '15 years plus']

# Define a UDF to map indices to labels
def map_index_to_label(index):
    if index is not None:
        return labels[index]
    else:
        return "NA"

# Register the UDF
map_index_udf = udf(map_index_to_label, StringType())

# Assuming 'pendency_decision_filing' is the column you want to bin
# You can also cast it to DoubleType to match the bin boundaries
df_cases_modified_mergedactsectionjp = df_cases_modified_mergedactsectionjp.withColumn("filing_judge_tenure_at_filing", col("filing_judge_tenure_at_filing").cast(DoubleType()))

# Use Bucketizer to create a new column with the labels
bucketizer = Bucketizer(splits=bins, inputCol="filing_judge_tenure_at_filing", outputCol="filing_judge_tenure_temp")
df_cases_modified_mergedactsectionjp = bucketizer.transform(df_cases_modified_mergedactsectionjp)

df_cases_modified_mergedactsectionjp = df_cases_modified_mergedactsectionjp.withColumn("filing_judge_tenure_temp", col("filing_judge_tenure_temp").cast(IntegerType()))

# Add a new column "label_values" to the DataFrame using the UDF
df_cases_modified_mergedactsectionjp = df_cases_modified_mergedactsectionjp.withColumn("filing_judge_tenure_at_filing_category", map_index_udf(df_cases_modified_mergedactsectionjp["filing_judge_tenure_temp"]))

In [48]:
df_cases_sample = df_cases_modified_mergedactsectionjp.select("filing_judge_tenure_at_filing", "filing_judge_tenure_at_filing_category")
df_cases_sample = df_cases_sample.filter(~(isnan(col("filing_judge_tenure_at_filing")) | isnull(col("filing_judge_tenure_at_filing_category"))))
df_cases_sample.show()

+-----------------------------+--------------------------------------+
|filing_judge_tenure_at_filing|filing_judge_tenure_at_filing_category|
+-----------------------------+--------------------------------------+
|                         NULL|                                    NA|
|                         NULL|                                    NA|
|                         NULL|                                    NA|
|                         NULL|                                    NA|
|                         NULL|                                    NA|
|                         NULL|                                    NA|
|                         NULL|                                    NA|
|                         24.0|                          2 to 3 years|
|                          8.0|                     6 month to 1 year|
|                         19.0|                          1 to 2 years|
|                          3.0|                         3 to 6 months|
|     

In [49]:
#Transform decision_judge_tenure_at_decision
from pyspark.ml.feature import Bucketizer
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, expr
bins = [-1, 1, 3, 6, 12, 24, 36, 48, 60, 72, 84, 96, 108, 120, 180, float('inf')]
labels = ['upto 1 months', '1 to 3 months', '3 to 6 months', '6 month to 1 year', '1 to 2 years', '2 to 3 years', '3 to 4 years', '4 to 5 years',
          '5 to 6 years', '6 to 7 years', '7 to 8 years', '8 to 9 years', '9 to 10 years',
          '10 to 15 years', '15 years plus']

# Define a UDF to map indices to labels
def map_index_to_label(index):
    if index is not None:
        return labels[index]
    else:
        return "NA"

# Register the UDF
map_index_udf = udf(map_index_to_label, StringType())

# Assuming 'pendency_decision_filing' is the column you want to bin
# You can also cast it to DoubleType to match the bin boundaries
df_cases_modified_mergedactsectionjp = df_cases_modified_mergedactsectionjp.withColumn("decision_judge_tenure_at_decision", col("decision_judge_tenure_at_decision").cast(DoubleType()))

# Use Bucketizer to create a new column with the labels
bucketizer = Bucketizer(splits=bins, inputCol="decision_judge_tenure_at_decision", outputCol="decision_judge_tenure_temp")
df_cases_modified_mergedactsectionjp = bucketizer.transform(df_cases_modified_mergedactsectionjp)

df_cases_modified_mergedactsectionjp = df_cases_modified_mergedactsectionjp.withColumn("decision_judge_tenure_temp", col("decision_judge_tenure_temp").cast(IntegerType()))

# Add a new column "label_values" to the DataFrame using the UDF
df_cases_modified_mergedactsectionjp = df_cases_modified_mergedactsectionjp.withColumn("decision_judge_tenure_at_decision_category", map_index_udf(df_cases_modified_mergedactsectionjp["decision_judge_tenure_temp"]))

In [50]:
df_cases_sample = df_cases_modified_mergedactsectionjp.select("decision_judge_tenure_at_decision", "decision_judge_tenure_at_decision_category")
df_cases_sample = df_cases_sample.filter(~(isnan(col("decision_judge_tenure_at_decision")) | isnull(col("decision_judge_tenure_at_decision_category"))))
df_cases_sample.show()

+---------------------------------+------------------------------------------+
|decision_judge_tenure_at_decision|decision_judge_tenure_at_decision_category|
+---------------------------------+------------------------------------------+
|                             NULL|                                        NA|
|                             NULL|                                        NA|
|                             NULL|                                        NA|
|                             NULL|                                        NA|
|                             NULL|                                        NA|
|                             NULL|                                        NA|
|                             NULL|                                        NA|
|                             10.0|                         6 month to 1 year|
|                             NULL|                                        NA|
|                             NULL|                 

In [51]:
df_cases_modified_mergedactsectionjp.printSchema()

root
 |-- year: string (nullable = true)
 |-- type_name: integer (nullable = true)
 |-- section: integer (nullable = true)
 |-- act: integer (nullable = true)
 |-- ddl_case_id: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- cino: string (nullable = true)
 |-- female_defendant: string (nullable = true)
 |-- female_petitioner: string (nullable = true)
 |-- female_adv_def: string (nullable = true)
 |-- female_adv_pet: string (nullable = true)
 |-- disp_name: string (nullable = true)
 |-- date_of_filing: timestamp (nullable = true)
 |-- date_of_decision: timestamp (nullable = true)
 |-- billable_ipc: string (nullable = true)
 |-- number_sections_ipc: double (nullable = true)
 |-- criminal: string (nullable = true)
 |-- female_judge_filing: string (nullable = true)
 |-- female_judge_decision: string (nullable = true)
 |-- judgediff: string (nullable = true)
 |-- pendency_decision_filing: string (nullable = true)
 |-- pendency_first_filing: double (nullable = true)
 

In [52]:
columns_to_drop_postmerge = ['pendency_first_filing_category', 'disp_name','type_name','number_sections_ipc',
                             'filing_judge_tenure','filing_judge_tenure_temp','pendency_first_filing_temp','number_sections_ipc_temp']

#columns_to_drop_postmerge = ['billable_ipc','type_name',','type_name','act','section','cino','disp_name','number_sections_ipc','pendency_decision_filling','pendency_first_filing','filing_judge_tenure','pendency_first_filing_categories','filing_judge_tenure_temp','pendency_first_filing_temp','number_sections_ipc_temp']
df_cases_modified_final = df_cases_modified_mergedactsectionjp.drop(*columns_to_drop_postmerge)

In [53]:
df_cases_modified_final.printSchema()

root
 |-- year: string (nullable = true)
 |-- section: integer (nullable = true)
 |-- act: integer (nullable = true)
 |-- ddl_case_id: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- cino: string (nullable = true)
 |-- female_defendant: string (nullable = true)
 |-- female_petitioner: string (nullable = true)
 |-- female_adv_def: string (nullable = true)
 |-- female_adv_pet: string (nullable = true)
 |-- date_of_filing: timestamp (nullable = true)
 |-- date_of_decision: timestamp (nullable = true)
 |-- billable_ipc: string (nullable = true)
 |-- criminal: string (nullable = true)
 |-- female_judge_filing: string (nullable = true)
 |-- female_judge_decision: string (nullable = true)
 |-- judgediff: string (nullable = true)
 |-- pendency_decision_filing: string (nullable = true)
 |-- pendency_first_filing: double (nullable = true)
 |-- pendency_decision_filing_categories: string (nullable = true)
 |-- pendency_first_filing_categories: string (nullable = true)
 |--

In [54]:
#columns_to_drop_postmerge = ['section','act','year','cino', 'date_of_filing', 'judge_position', 'date_of_decision','pendency_first_filing_categories', 'decision_judge_tenure']
columns_to_drop_postmerge = ['section','act','year','cino', 'judge_position', 'pendency_first_filing_categories', 'decision_judge_tenure']


df_cases_modified_final = df_cases_modified_final.drop(*columns_to_drop_postmerge)

In [55]:
df_cases_modified_final.printSchema()

root
 |-- ddl_case_id: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- female_defendant: string (nullable = true)
 |-- female_petitioner: string (nullable = true)
 |-- female_adv_def: string (nullable = true)
 |-- female_adv_pet: string (nullable = true)
 |-- date_of_filing: timestamp (nullable = true)
 |-- date_of_decision: timestamp (nullable = true)
 |-- billable_ipc: string (nullable = true)
 |-- criminal: string (nullable = true)
 |-- female_judge_filing: string (nullable = true)
 |-- female_judge_decision: string (nullable = true)
 |-- judgediff: string (nullable = true)
 |-- pendency_decision_filing: string (nullable = true)
 |-- pendency_first_filing: double (nullable = true)
 |-- pendency_decision_filing_categories: string (nullable = true)
 |-- muslim_defendant: string (nullable = true)
 |-- muslim_petitioner: string (nullable = true)
 |-- muslim_adv_def: string (nullable = true)
 |-- muslim_adv_pet: string (nullable = true)
 |-- court_details: string 

In [56]:
columns_to_drop_postmerge = ['pendency_decision_filing_categories','decision_judge_tenure','countact', 'count','decision_judge_tenure_temp', 'decision_judge_tenure_at_decision_category','filing_judge_tenure_at_filing_category']

df_cases_modified = df_cases_modified_final.drop(*columns_to_drop_postmerge)

In [57]:
df_cases_modified.printSchema()

root
 |-- ddl_case_id: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- female_defendant: string (nullable = true)
 |-- female_petitioner: string (nullable = true)
 |-- female_adv_def: string (nullable = true)
 |-- female_adv_pet: string (nullable = true)
 |-- date_of_filing: timestamp (nullable = true)
 |-- date_of_decision: timestamp (nullable = true)
 |-- billable_ipc: string (nullable = true)
 |-- criminal: string (nullable = true)
 |-- female_judge_filing: string (nullable = true)
 |-- female_judge_decision: string (nullable = true)
 |-- judgediff: string (nullable = true)
 |-- pendency_decision_filing: string (nullable = true)
 |-- pendency_first_filing: double (nullable = true)
 |-- muslim_defendant: string (nullable = true)
 |-- muslim_petitioner: string (nullable = true)
 |-- muslim_adv_def: string (nullable = true)
 |-- muslim_adv_pet: string (nullable = true)
 |-- court_details: string (nullable = false)
 |-- state_district: string (nullable = false)
 

In [58]:
df_cases_modified= df_cases_modified.fillna("NA")

In [59]:
df_cases_modified = df_cases_modified.filter(col("criminal") == 1)


In [60]:
print(df_cases_modified.count())


24205125


In [61]:
df_cases_modified.show(20)

+--------------------+----------+----------------+-----------------+--------------+--------------+-------------------+-------------------+------------+--------+-------------------+---------------------+---------+------------------------+---------------------+----------------+-----------------+--------------+--------------+-------------+--------------+------------------------+-----------------------------+---------------------------------+--------------------+---------+------------+-----------+--------------------+----------------------------+
|         ddl_case_id|state_code|female_defendant|female_petitioner|female_adv_def|female_adv_pet|     date_of_filing|   date_of_decision|billable_ipc|criminal|female_judge_filing|female_judge_decision|judgediff|pendency_decision_filing|pendency_first_filing|muslim_defendant|muslim_petitioner|muslim_adv_def|muslim_adv_pet|court_details|state_district|pendency_model_multiyear|filing_judge_tenure_at_filing|decision_judge_tenure_at_decision|         

In [62]:
df_cases_modified.printSchema()

root
 |-- ddl_case_id: string (nullable = false)
 |-- state_code: string (nullable = false)
 |-- female_defendant: string (nullable = false)
 |-- female_petitioner: string (nullable = false)
 |-- female_adv_def: string (nullable = false)
 |-- female_adv_pet: string (nullable = false)
 |-- date_of_filing: timestamp (nullable = true)
 |-- date_of_decision: timestamp (nullable = true)
 |-- billable_ipc: string (nullable = false)
 |-- criminal: string (nullable = false)
 |-- female_judge_filing: string (nullable = false)
 |-- female_judge_decision: string (nullable = false)
 |-- judgediff: string (nullable = false)
 |-- pendency_decision_filing: string (nullable = false)
 |-- pendency_first_filing: double (nullable = true)
 |-- muslim_defendant: string (nullable = false)
 |-- muslim_petitioner: string (nullable = false)
 |-- muslim_adv_def: string (nullable = false)
 |-- muslim_adv_pet: string (nullable = false)
 |-- court_details: string (nullable = false)
 |-- state_district: string (nul

In [63]:
columns_to_drop = ['countsection','pendency_decision_filing','criminal']

# Create a copy of the dataframe and start preparing the same for prediction modelling
df_cases_modified = df_cases_modified.drop(*columns_to_drop)

In [64]:
df_cases_modified.printSchema()

root
 |-- ddl_case_id: string (nullable = false)
 |-- state_code: string (nullable = false)
 |-- female_defendant: string (nullable = false)
 |-- female_petitioner: string (nullable = false)
 |-- female_adv_def: string (nullable = false)
 |-- female_adv_pet: string (nullable = false)
 |-- date_of_filing: timestamp (nullable = true)
 |-- date_of_decision: timestamp (nullable = true)
 |-- billable_ipc: string (nullable = false)
 |-- female_judge_filing: string (nullable = false)
 |-- female_judge_decision: string (nullable = false)
 |-- judgediff: string (nullable = false)
 |-- pendency_first_filing: double (nullable = true)
 |-- muslim_defendant: string (nullable = false)
 |-- muslim_petitioner: string (nullable = false)
 |-- muslim_adv_def: string (nullable = false)
 |-- muslim_adv_pet: string (nullable = false)
 |-- court_details: string (nullable = false)
 |-- state_district: string (nullable = false)
 |-- pendency_model_multiyear: string (nullable = false)
 |-- filing_judge_tenure_a

In [65]:
# Covid Impacted Cases Remove
from pyspark.sql.functions import col
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as F

# Define cutoff dates as timestamp literals
cutoff_filing_date = F.lit("2018-03-25").cast(TimestampType())
cutoff_decision_date = F.lit("2020-03-25").cast(TimestampType())

# Apply correct parentheses to separate logical clauses
df_filtered = df_cases_modified.filter(
    ((col("date_of_filing") > cutoff_filing_date) & (col("date_of_decision").isNull())) |
    (col("date_of_decision") > cutoff_decision_date)
)

# Count filtered rows
filtered_count = df_filtered.count()
print(f"Number of rows matching the filter: {filtered_count}")


Number of rows matching the filter: 1815170


In [66]:
from pyspark.sql.functions import max

# Get the maximum (latest) decision date
latest_decision_date = df_filtered.agg(max("date_of_decision").alias("latest_decision_date")).collect()[0]["latest_decision_date"]

print(f"The latest decision date is: {latest_decision_date}")


The latest decision date is: 2020-12-31 00:00:00


In [67]:
from pyspark.sql.functions import col, lit
from pyspark.sql.types import TimestampType

# Define the cutoff decision date
cutoff_decision_date = lit("2020-03-25").cast(TimestampType())

# Filter and count cases with decision after cutoff date
count_after_cutoff = df_cases_modified.filter(
    col("date_of_decision") > cutoff_decision_date
).count()

print(f"Number of cases decided after 2020-03-25: {count_after_cutoff}")


Number of cases decided after 2020-03-25: 8778


In [68]:
df_cases_modified = df_cases_modified.subtract(df_filtered)
df_cases_modified.count()

22389955

In [69]:
columns_to_drop = ['ddl_case_id','date_of_decision','date_of_filing']

# Create a copy of the dataframe and start preparing the same for prediction modelling
df_cases_modified = df_cases_modified.drop(*columns_to_drop)

In [70]:
state_code_df = df_cases_modified.groupBy("type_name_s").count()
totcount = state_code_df.count()
state_code_df.show(totcount, truncate=False)
print(totcount)

+--------------------------------------------------+--------+
|type_name_s                                       |count   |
+--------------------------------------------------+--------+
|spl.cri.m.a.                                      |1466    |
|transfer application (criminal)                   |180     |
|488 cr. p c                                       |78      |
|misc dj asj                                       |117     |
|misc. ( criminal)                                 |775     |
|g r case ( summon)                                |367     |
|1cc                                               |367     |
|summons private cases ss                          |246     |
|g.r. supplementary-1                              |21      |
|criminal misc. petition                           |3621    |
|mv act                                            |693     |
|cril. misc. bail                                  |372     |
|u.c. case                                         |24      |
|cr cc  

In [71]:
jp_df = df_cases_modified.groupBy("pendency_model_multiyear").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+------------------------+--------+
|pendency_model_multiyear|count   |
+------------------------+--------+
|ontime                  |13446694|
|delayed                 |8943261 |
+------------------------+--------+

2


In [72]:
jp_df = df_cases_modified.groupBy("pendency_first_filing").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+---------------------+-------+
|pendency_first_filing|count  |
+---------------------+-------+
|8.0                  |189992 |
|70.0                 |9721   |
|67.0                 |11873  |
|0.0                  |9838782|
|69.0                 |10401  |
|7.0                  |227776 |
|112.0                |91     |
|108.0                |656    |
|88.0                 |4841   |
|49.0                 |24993  |
|101.0                |2651   |
|98.0                 |2708   |
|116.0                |25     |
|29.0                 |52521  |
|107.0                |1071   |
|64.0                 |13626  |
|75.0                 |7816   |
|47.0                 |27381  |
|42.0                 |32931  |
|44.0                 |30202  |
|62.0                 |14873  |
|35.0                 |41696  |
|NULL                 |518281 |
|96.0                 |3070   |
|18.0                 |85397  |
|86.0                 |5265   |
|80.0                 |6575   |
|1.0                  |3820283|
|39.0   

In [73]:
jp_df = df_cases_modified.groupBy("filing_judge_tenure_at_filing").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+-----------------------------+--------+
|filing_judge_tenure_at_filing|count   |
+-----------------------------+--------+
|147.0                        |79      |
|8.0                          |368866  |
|70.0                         |3852    |
|67.0                         |4562    |
|0.0                          |734355  |
|69.0                         |3849    |
|7.0                          |412183  |
|154.0                        |70      |
|112.0                        |347     |
|124.0                        |115     |
|128.0                        |53      |
|108.0                        |349     |
|234.0                        |44      |
|133.0                        |12      |
|181.0                        |47      |
|88.0                         |753     |
|49.0                         |11067   |
|101.0                        |472     |
|98.0                         |493     |
|116.0                        |270     |
|29.0                         |91742   |
|107.0          

In [None]:
jp_df = df_cases_modified.groupBy("decision_judge_tenure_at_decision").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+---------------------------------+--------+
|decision_judge_tenure_at_decision|count   |
+---------------------------------+--------+
|8.0                              |333748  |
|70.0                             |2822    |
|67.0                             |3669    |
|0.0                              |544284  |
|69.0                             |2759    |
|7.0                              |374240  |
|112.0                            |327     |
|124.0                            |77      |
|128.0                            |43      |
|235.0                            |54      |
|108.0                            |321     |
|180.0                            |55      |
|234.0                            |43      |
|133.0                            |12      |
|88.0                             |494     |
|49.0                             |6348    |
|101.0                            |508     |
|98.0                             |470     |
|116.0                            |195     |
|29.0     

In [None]:
state_code_df = df_cases_modified.groupBy("state_code").count()
totcount = state_code_df.count()
state_code_df.show(totcount, truncate=False)
print(totcount)

+----------+-------+
|state_code|count  |
+----------+-------+
|7         |642181 |
|15        |77389  |
|11        |469397 |
|29        |607739 |
|3         |1543610|
|30        |33321  |
|8         |1885966|
|22        |670806 |
|16        |818362 |
|5         |197266 |
|31        |3170   |
|18        |361277 |
|27        |26875  |
|17        |1407682|
|26        |392255 |
|6         |467655 |
|19        |1744   |
|23        |1740110|
|25        |33191  |
|33        |132    |
|9         |1234467|
|24        |11353  |
|32        |4133   |
|1         |2896505|
|20        |39257  |
|10        |730057 |
|4         |1559609|
|12        |24755  |
|13        |3254237|
|14        |604936 |
|21        |6059   |
|2         |644459 |
+----------+-------+

32


In [None]:
jp_df = df_cases_modified.groupBy("state_district").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+--------------+------+
|state_district|count |
+--------------+------+
|23_27         |30885 |
|8_6           |85070 |
|23_15         |52134 |
|11_11         |18075 |
|10_32         |18785 |
|22_13         |15616 |
|1_30          |119320|
|7_15          |15491 |
|13_58         |70465 |
|2_7           |36895 |
|5_4           |23354 |
|6_12          |18065 |
|8_19          |25389 |
|2_23          |48759 |
|10_16         |5149  |
|17_42         |9948  |
|14_11         |21102 |
|26_8          |54403 |
|11_21         |12660 |
|29_7          |40575 |
|10_19         |6482  |
|11_17         |16917 |
|1_21          |219459|
|5_11          |18029 |
|20_5          |3835  |
|20_3          |4361  |
|2_19          |20747 |
|8_30          |56473 |
|1_33          |12670 |
|10_12         |19551 |
|10_25         |12484 |
|22_15         |13870 |
|7_2           |23133 |
|8_14          |33890 |
|10_27         |25084 |
|16_19         |18823 |
|23_44         |77153 |
|23_16         |26535 |
|18_19         |

In [None]:
jp_df = df_cases_modified.groupBy("court_details").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+-------------+------+
|court_details|count |
+-------------+------+
|23_49_2      |1371  |
|9_26_13      |1854  |
|10_9_12      |4246  |
|16_7_3       |5311  |
|17_39_6      |2065  |
|10_13_4      |328   |
|11_7_9       |57    |
|13_21_5      |2700  |
|10_4_23      |2567  |
|17_24_7      |2921  |
|6_9_11       |1369  |
|23_1_4       |2453  |
|10_28_14     |123   |
|29_3_18      |150   |
|13_14_2      |8270  |
|14_9_6       |3918  |
|1_9_15       |4352  |
|2_11_21      |3735  |
|16_7_18      |1     |
|22_8_8       |1776  |
|17_13_12     |849   |
|17_27_17     |1152  |
|9_22_33      |250   |
|3_14_1       |9848  |
|22_14_1      |8967  |
|3_23_14      |1367  |
|3_26_20      |3243  |
|15_9_5       |77    |
|4_9_23       |10693 |
|1_34_4       |1099  |
|2_17_5       |2292  |
|6_13_4       |17276 |
|1_5_6        |4624  |
|17_23_16     |2336  |
|1_8_9        |2799  |
|2_23_19      |2675  |
|29_3_4       |2538  |
|29_8_11      |2951  |
|9_13_16      |2781  |
|10_24_17     |1545  |
|18_18_3   

In [None]:
jp_df = df_cases_modified.groupBy("female_judge_filing").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+-------------------+--------+
|female_judge_filing|count   |
+-------------------+--------+
|NA                 |12852324|
|1 female           |2280612 |
|0 nonfemale        |6975643 |
|-9998 unclear      |281376  |
+-------------------+--------+

4


In [None]:
jp_df = df_cases_modified.groupBy("female_judge_decision").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+---------------------+--------+
|female_judge_decision|count   |
+---------------------+--------+
|NA                   |11454664|
|1 female             |2855469 |
|0 nonfemale          |7750307 |
|-9998 unclear        |329515  |
+---------------------+--------+

4


In [None]:
jp_df = df_cases_modified.groupBy("judgediff").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+---------+--------+
|judgediff|count   |
+---------+--------+
|1.0      |3189790 |
|NA       |13335561|
|0.0      |5864604 |
+---------+--------+

3


In [None]:
jp_df = df_cases_modified.groupBy("billable_ipc").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+-------------------------+-------+
|billable_ipc             |count  |
+-------------------------+-------+
|NA                       |8969495|
|not applicable to section|525330 |
|unclear, need to check   |17520  |
|depends                  |438225 |
|bailable                 |7122557|
|non-bailable             |5316828|
+-------------------------+-------+

6


In [None]:
jp_df = df_cases_modified.groupBy("number_sections_ipc_category").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+----------------------------+-------+
|number_sections_ipc_category|count  |
+----------------------------+-------+
|10 to 15                    |108374 |
|two                         |2788062|
|5 to 10                     |2177397|
|NA                          |6342006|
|four                        |1634816|
|one                         |7062394|
|three                       |2269065|
|25 to 30                    |146    |
|20 to 25                    |1131   |
|15 to 20                    |6536   |
|30 to 35                    |17     |
|35 to 40                    |8      |
|40 plus                     |3      |
+----------------------------+-------+

13


In [None]:
jp_df = df_cases_modified.groupBy("judge_position_s").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+-----------------------------------------------------+-------+
|judge_position_s                                     |count  |
+-----------------------------------------------------+-------+
|cantonment court                                     |6211   |
|lunawada                                             |2340   |
|spcl court - bharatpur district hq                   |2781   |
|acd jaipur metro hq                                  |260    |
|a.d.j.,salipur                                       |76     |
|jjb criminal                                         |41     |
|vaghaicourt                                          |276    |
|4-additional district judge                          |941    |
|additional district judge                            |345258 |
|additional court                                     |24324  |
|jcj court miryalaguda                                |4480   |
|jjb bharatpur district hq                            |59     |
|special court for exclusive trial of ca

In [None]:
jp_df = df_cases_modified.groupBy("female_defendant").count()
jp_df.show()

+----------------+--------+
|female_defendant|   count|
+----------------+--------+
|           -9998| 6074111|
|           -9999|   39799|
|               0|14513863|
|               1| 1762182|
+----------------+--------+



In [None]:
jp_df = df_cases_modified.groupBy("female_petitioner").count()
jp_df.show()

+------------------+--------+
| female_petitioner|   count|
+------------------+--------+
|-9999 missing name|   26307|
|          1 female| 3000097|
|     -9998 unclear|12040291|
|            0 male| 7323260|
+------------------+--------+



In [None]:
jp_df = df_cases_modified.groupBy("female_adv_def").count()
jp_df.show()

+--------------+--------+
|female_adv_def|   count|
+--------------+--------+
|         -9998|  564358|
|         -9999|17813275|
|             0| 3749531|
|             1|  262791|
+--------------+--------+



In [None]:
jp_df = df_cases_modified.groupBy("female_adv_pet").count()
jp_df.show()

+--------------+-------+
|female_adv_pet|  count|
+--------------+-------+
|         -9998|3539801|
|         -9999|9755293|
|             0|8064274|
|             1|1030587|
+--------------+-------+



In [None]:
jp_df = df_cases_modified.groupBy("muslim_defendant").count()
jp_df.show()

+------------------+--------+
|  muslim_defendant|   count|
+------------------+--------+
|          1 muslim| 2531952|
|-9999 missing name|   39799|
|       0 nonmuslim|14786913|
|     -9998 unclear| 5031291|
+------------------+--------+



In [None]:
jp_df = df_cases_modified.groupBy("muslim_petitioner").count()
jp_df.show()

+------------------+--------+
| muslim_petitioner|   count|
+------------------+--------+
|          1 muslim| 1443836|
|-9999 missing name|   26307|
|       0 nonmuslim| 9404541|
|     -9998 unclear|11515271|
+------------------+--------+



In [None]:
jp_df = df_cases_modified.groupBy("muslim_adv_def").count()
jp_df.show()

+--------------+--------+
|muslim_adv_def|   count|
+--------------+--------+
|         -9998|  176596|
|         -9999|17813275|
|             0| 4018146|
|             1|  381938|
+--------------+--------+



In [None]:
jp_df = df_cases_modified.groupBy("muslim_adv_pet").count()
jp_df.show()

+--------------+--------+
|muslim_adv_pet|   count|
+--------------+--------+
|         -9998|  377129|
|         -9999| 9755293|
|             0|11417193|
|             1|  840340|
+--------------+--------+



In [None]:
jp_df = df_cases_modified.groupBy("act_s").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+--------------------------+--------+
|act_s                     |count   |
+--------------------------+--------+
|Code of Criminal Procedure|6342006 |
|The Indian Penal Code     |16047949|
+--------------------------+--------+

2


In [None]:
jp_df = df_cases_modified.groupBy("section_s").count()
totcount = jp_df.count()
jp_df.show(totcount, truncate=False)
print(totcount)

+---------+-------+
|section_s|count  |
+---------+-------+
|467      |230746 |
|296      |364    |
|125      |1023416|
|451      |172432 |
|7        |11352  |
|447      |26936  |
|51       |5636   |
|124      |1085   |
|307      |543989 |
|475      |435    |
|334      |320    |
|169      |964    |
|205      |1057   |
|272      |23885  |
|442      |207    |
|470      |99     |
|462      |86     |
|15       |4925   |
|448      |24876  |
|232      |3235   |
|54       |1424   |
|234      |2030   |
|383      |368    |
|282      |148    |
|155      |56780  |
|483      |184    |
|317      |12698  |
|132      |2017   |
|154      |876    |
|200      |5092   |
|495      |2946   |
|388      |2434   |
|428      |833    |
|11       |3134   |
|101      |1355   |
|279      |1180416|
|415      |781    |
|433      |112    |
|323      |399175 |
|138      |338110 |
|387      |21599  |
|351      |184    |
|361      |278    |
|309      |19646  |
|29       |5684   |
|419      |9359   |
|69       |784    |


In [None]:
df_cases_modified.show()

+----------+----------------+-----------------+--------------+--------------+------------+-------------------+---------------------+---------+---------------------+----------------+-----------------+--------------+--------------+-------------+--------------+------------------------+-----------------------------+---------------------------------+--------------------+---------+------------+--------------------+----------------------------+
|state_code|female_defendant|female_petitioner|female_adv_def|female_adv_pet|billable_ipc|female_judge_filing|female_judge_decision|judgediff|pendency_first_filing|muslim_defendant|muslim_petitioner|muslim_adv_def|muslim_adv_pet|court_details|state_district|pendency_model_multiyear|filing_judge_tenure_at_filing|decision_judge_tenure_at_decision|               act_s|section_s| type_name_s|    judge_position_s|number_sections_ipc_category|
+----------+----------------+-----------------+--------------+--------------+------------+-------------------+------

### Save as one file

In [None]:
df_cases_modified.coalesce(1).write.csv("automl_2010to2018_final_criminal.csv", header=True, mode="overwrite")

In [None]:
!cat /content/automl_2010to2018_final_criminal.csv/*.csv > /content/automl_2010to18_final_criminal.csv

In [None]:
!cp /content/automl_2010to18_final_criminal.csv /content/drive/MyDrive/Law/Model1cData_2010to2018_merged_criminal.csv