![alt text](Logo_Capgemini-1.jpg)

# <center> PySpark MINI-PROJECT </center>


---


---

---

### <center> Code:</center>

---

1. *Set the pyspark environment variables*

In [1]:
import os
import sys
os.environ['SPARK_HOME'] = r"C:\spark"
os.environ['HADOOP_HOME'] = r"C:\hadoop"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

2. *Import Libraries*

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
from pyspark.sql.functions import year, quarter, month, dayofweek, to_date, monotonically_increasing_id, date_format

3. *Session Created*

In [3]:
spark = SparkSession.builder \
    .appName("Statistical Analysis of Campus Recuritment") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()


4. *Load the CSV data into DataFrames*

In [None]:
# Load CSV Files
campus_company_df = spark.read.csv(r"C:\MINI-PROJECT\Campus_Source1_Company.csv", header=True, inferSchema=True)
campus_college_df = spark.read.csv(r"C:\MINI-PROJECT\Campus_Source2_College.csv", header=True, inferSchema=True)

# Fill the NULL data with 0 for computation (if needed)
campus_company_df.na.fill(0)
campus_college_df.na.fill(0)

DataFrame[date_key: int, full_date: string, day_of_week: int, day_num_in_month: int, day_num_overall: int, day_name: string, day_abbrev: string, weekday_flag: string, week_num_in_year: int, week_num_overall: int, week_begin_date: string, week_begin_date_key: int, fb_month: int, month_num_overall: int, month_name: string, month_abbrev: string, quarter: int, fb_year: int, yearmo: int, fiscal_month: int, fiscal_quarter: int, fiscal_year: int, last_day_in_month_flag: string, same_day_year_ago_date: string]

In [None]:
# Adjust format if needed
campus_company_df = campus_company_df.withColumn("Date", to_date("Date", "dd-MMM-yy")) 
#campus_company_df.show()
campus_college_df = campus_college_df.withColumn("Date", to_date("Date", "dd-MMM-yy"))  # Adjust format if needed
#campus_college_df.show()

5. *Create Dimension Tables from Data Loaded into DataFrames*

- *Table 1: College Dimension Table*

In [6]:
# Create College dimension table
college_df = campus_college_df.select(
    "College_Name",
    "Location"
).distinct()

# Adding College_ID column name to the College Dimension Table
college_df = college_df.withColumn("College_ID", monotonically_increasing_id()+10000)

# Show College dimension table
college_df.show()

+--------------------+--------------+----------+
|        College_Name|      Location|College_ID|
+--------------------+--------------+----------+
|BIRLA INSTITUTE O...|     RAJASTHAN|     10000|
|       IIT KHARAGPUR|   WEST BENGAL|     10001|
|COLLEGE OF ENGINE...|   MAHARASHTRA|     10002|
|       IIIT HYDERBAD|ANDHRA PRADESH|     10003|
|           IIT DELHI|         DELHI|     10004|
|        IIT GUWAHATI|         ASSAM|     10005|
|     MNNIT ALLAHABAD| UTTAR PRADESH|     10006|
|VISHWESHWARAYYA T...|     KARNATAKA|     10007|
|          IIT KANPUR| UTTAR PRADESH|     10008|
|          IIT MUMBAI|   MAHARASHTRA|     10009|
|      SRM UNIVERSITY|     TELANGANA|     10010|
|MANIPAL INSTITUTE...|     KARNATAKA|     10011|
|         IIT ROORKEE| UTTAR PRADESH|     10012|
|DELHI COLLEGE OF ...|         DELHI|     10013|
|VELLORE INSTITUTE...|    TAMIL NADU|     10014|
|PSG COLLEGE OF TE...|    TAMIL NADU|     10015|
|INSTITUTE OF TECH...| UTTAR PRADESH|     10016|
|          IIT MADRA

- *Table 2: Company Dimension Table*

In [7]:
# Create Company dimension table
company_df = campus_company_df.select(
    "Company_Name",
    "Head_Office_Location",
    "Country"
).distinct()

# Adding Company_ID column name to the Company Dimension Table
company_df = company_df.withColumn("Company_ID", monotonically_increasing_id()+20000)
company_df.show()

+------------+--------------------+-----------+----------+
|Company_Name|Head_Office_Location|    Country|Company_ID|
+------------+--------------------+-----------+----------+
|    FACEBOOK|          MENLO PARK|        USA|     20000|
|     SAMSUNG|               SUWON|SOUTH KOREA|     20001|
|   COGNIZANT|             TEANECK|        USA|     20002|
|       EMC 2|           HOPKINTON|        USA|     20003|
|        KPMG|           AMSTERDAM|NETHERLANDS|     20004|
|      AKAMAI|           CAMBRIDGE|        USA|     20005|
|   MICROSOFT|             REDMOND|        USA|     20006|
|         TCS|              MUMBAI|      INDIA|     20007|
|    MU SIGMA|             CHICAGO|        USA|     20008|
|   ACCENTURE|              DUBLIN|    IRELAND|     20009|
|       WIPRO|           BENGALURU|      INDIA|     20010|
|   JP MORGAN|           NEW YORK |        USA|     20011|
|      VMWARE|           PALO ALTO|        USA|     20012|
|         HCL|               NOIDA|      INDIA|     2001

- *Table 3: Survey Dimension Table*

In [8]:
# Create Survey dimension table
survey_df = campus_college_df.select(
    "SURVEY_Name",
).distinct()
# Adding Survey_ID column name to the Survey Dimension Table
survey_df = survey_df.withColumn("Survey_ID", monotonically_increasing_id()+30000)

# Show Survey Table
survey_df.show()

+--------------+---------+
|   SURVEY_Name|Survey_ID|
+--------------+---------+
|           DNA|    30000|
|Mint NewsPaper|    30001|
|        Forbes|    30002|
+--------------+---------+



- *Table 4: Time Dimension Table*

In [9]:
# Extract date from both College & Company Dimension Table
time_df = campus_college_df.select("Date").distinct().union(campus_company_df.select("Date").distinct()).distinct()

# Extract year, quarter, month, and day of the week from the date column & add Time_ID to time dimension table
time_df = time_df.withColumn("Year", year(time_df["Date"])) \
                 .withColumn("Quarter", quarter(time_df["Date"])) \
                 .withColumn("Month", month(time_df["Date"])) \
                 .withColumn("Day_Of_Week", dayofweek(time_df["Date"])) \
                 .withColumn("Time_ID", monotonically_increasing_id()+40000)

# Show Time Dimension table
time_df.show()

+----------+----+-------+-----+-----------+-------+
|      Date|Year|Quarter|Month|Day_Of_Week|Time_ID|
+----------+----+-------+-----+-----------+-------+
|2014-01-10|2014|      1|    1|          6|  40000|
|2013-01-10|2013|      1|    1|          5|  40001|
|2012-01-10|2012|      1|    1|          3|  40002|
|2012-10-06|2012|      4|   10|          7|  40003|
|2013-09-09|2013|      3|    9|          2|  40004|
|2013-05-21|2013|      2|    5|          3|  40005|
|2013-03-26|2013|      1|    3|          3|  40006|
|2014-09-26|2014|      3|    9|          6|  40007|
|2014-11-12|2014|      4|   11|          4|  40008|
|2013-09-19|2013|      3|    9|          5|  40009|
|2012-11-11|2012|      4|   11|          1|  40010|
|2013-02-02|2013|      1|    2|          7|  40011|
|2012-09-21|2012|      3|    9|          6|  40012|
|2012-03-30|2012|      1|    3|          6|  40013|
|2012-07-14|2012|      3|    7|          7|  40014|
|2013-02-04|2013|      1|    2|          2|  40015|
|2012-11-03|

6. *Create FACT Tables from Joining the original data and Dimension Tables*

- *Table 5: Fact College Table*

In [10]:
# Create Fact College Table by joining campus_company DataFrame and various dimension tables

fact_College_df = campus_company_df.join(time_df, campus_company_df["Date"] == time_df["Date"],"inner") \
                                    .join(college_df, campus_company_df["College_Name"] == college_df["College_Name"],"inner") \
                                    .join(company_df, campus_company_df["Company_Name"] == company_df["Company_Name"]) \
                                    .select(
    "Time_ID",
    "College_ID",
    "Company_ID",   
    "Package",
    "Student_Selected",
    "Student_Participated",
    "Criteria"
).distinct()

# Adding CollegeID column name to the College_DF
fact_College_df = fact_College_df.withColumn("Fact__College_ID", monotonically_increasing_id()+50000)

# show Fact College Table
fact_College_df.show()

+-------+----------+----------+-------+----------------+--------------------+--------+----------------+
|Time_ID|College_ID|Company_ID|Package|Student_Selected|Student_Participated|Criteria|Fact__College_ID|
+-------+----------+----------+-------+----------------+--------------------+--------+----------------+
|  40019|     10009|     20013|    4.5|             260|                 500|      60|           50000|
|  40468|     10011|     20001|    7.0|              60|                 110|      80|           50001|
|  40375|     10013|     20019|    9.0|              35|                  55|      85|           50002|
|  40074|     10010|     20016|    7.5|              91|                 470|      70|           50003|
|  40224|     10008|     20008|    6.0|              17|                  53|      70|           50004|
|  40293|     10019|     20018|    3.5|             158|                 411|      65|           50005|
|  40232|     10003|     20009|    3.0|             184|        

- *Table 6: Fact Survey Table*

In [11]:
# Create Fact Survey Table by joining campus_college DataFrame and various dimension tables

fact_survey_df = campus_college_df.join(time_df, campus_college_df["Date"] == time_df["Date"],"inner") \
                                    .join(college_df, campus_college_df["College_Name"] == college_df["College_Name"],"inner") \
                                    .join(survey_df, campus_college_df["SURVEY_Name"] == survey_df["SURVEY_Name"]) \
                                    .select(
    "Time_ID",
    "College_ID",
    "Survey_ID",   
    "NO_OF_STUDENTS_FOREIGN",
    "NO_OF_STUDENTS_INDIAN",
    "Fees",
    "Rank"
).distinct()

# Adding CollegeID column name to the College_DF
fact_survey_df = fact_survey_df.withColumn("Fact_Survey_ID", monotonically_increasing_id()+60000)

# show Fact Survey Table
fact_survey_df.show()

+-------+----------+---------+----------------------+---------------------+------+----+--------------+
|Time_ID|College_ID|Survey_ID|NO_OF_STUDENTS_FOREIGN|NO_OF_STUDENTS_INDIAN|  Fees|Rank|Fact_Survey_ID|
+-------+----------+---------+----------------------+---------------------+------+----+--------------+
|  40001|     10009|    30002|                   307|                 2386| 75000|   2|         60000|
|  40000|     10019|    30001|                   205|                 1978| 85000|  17|         60001|
|  40001|     10004|    30002|                   611|                 1645| 90000|   4|         60002|
|  40002|     10005|    30000|                   120|                 1600| 85000|   8|         60003|
|  40002|     10009|    30000|                   120|                 1600| 75000|   1|         60004|
|  40001|     10001|    30002|                   821|                 2689| 72000|   1|         60005|
|  40002|     10002|    30000|                   200|                 268

7. *Extracting Data using Query to get the desired results and saving results in CSV files on local storage*

---

- *QUERY 1*

In [12]:
# QUERY 1 :  year wise college ranking

# Join Fact_survey_df with time_df to get Year and join with college_df to get College_name
ranking_year_df = fact_survey_df \
    .join(time_df, fact_survey_df["Time_ID"] == time_df["Time_ID"], "inner") \
    .join(college_df, fact_survey_df["College_ID"] == college_df["College_ID"], "inner") \
    .select("Year", "College_Name", "Rank").orderBy("Year","Rank")

# showing result
ranking_year_df.show()

+----+--------------------+----+
|Year|        College_Name|Rank|
+----+--------------------+----+
|2012|          IIT MUMBAI|   1|
|2012|           IIT DELHI|   2|
|2012|       IIT KHARAGPUR|   3|
|2012|         IIT ROORKEE|   4|
|2012|          IIT KANPUR|   5|
|2012|          IIT MADRAS|   6|
|2012|BIRLA INSTITUTE O...|   7|
|2012|        IIT GUWAHATI|   8|
|2012|      SRM UNIVERSITY|   9|
|2012|THAPAR INSTITUTE ...|  10|
|2012|VELLORE INSTITUTE...|  11|
|2012|MANIPAL INSTITUTE...|  12|
|2012|       IIIT HYDERBAD|  13|
|2012|INSTITUTE OF TECH...|  14|
|2012|DHIRUBHAI AMBANI ...|  15|
|2012|PSG COLLEGE OF TE...|  16|
|2012|COLLEGE OF ENGINE...|  17|
|2012|VISHWESHWARAYYA T...|  18|
|2012|     MNNIT ALLAHABAD|  19|
|2012|DELHI COLLEGE OF ...|  20|
+----+--------------------+----+
only showing top 20 rows



- *Save results of above query on local storage in CSV format*

In [None]:
# Save Query 2 to CSV File - Save REPORTS
ranking_year_df.write.mode("overwrite").csv(r"C:\MINI-PROJECT\Reports\Ranking_Yearwise", header=True)

---

- *QUERY 2*

In [14]:
# QUERY 2 :  Year Wise College Wise Percentage Selection

# Join fact_College_df with time_df, college_df, and company_df to get Year, College_Name, and Company_Name 
selection_df = fact_College_df \
    .join(time_df, fact_College_df["Time_ID"] == time_df["Time_ID"], "inner") \
    .join(college_df, fact_College_df["College_ID"] == college_df["College_ID"], "inner") \
    .join(company_df, fact_College_df["Company_ID"] == company_df["Company_ID"], "inner") \
    .select("Year", "College_Name", "Company_Name", "Student_Selected", "Student_Participated")

# Calculate selection percentage
selection_df = selection_df \
    .withColumn("Selection_Percentage", (col("Student_Selected") / col("Student_Participated")) * 100)

# Select Year, college_name, Company_name, Selection_percentage order by Year and College_name
selection_df = selection_df.select("Year", "College_Name", "Company_Name","Selection_Percentage").orderBy("Year","College_Name")

# Show the result
selection_df.show()



+----+--------------------+------------+--------------------+
|Year|        College_Name|Company_Name|Selection_Percentage|
+----+--------------------+------------+--------------------+
|2012|BIRLA INSTITUTE O...|   HONEYWELL|   68.18181818181817|
|2012|BIRLA INSTITUTE O...|    MU SIGMA|   33.33333333333333|
|2012|BIRLA INSTITUTE O...|     INFOSYS|   70.17543859649122|
|2012|BIRLA INSTITUTE O...|         TCS|   55.55555555555556|
|2012|BIRLA INSTITUTE O...|       IGATE|   53.84615384615385|
|2012|BIRLA INSTITUTE O...|      VMWARE|   57.14285714285714|
|2012|BIRLA INSTITUTE O...|        KPMG|                65.0|
|2012|BIRLA INSTITUTE O...|     SAMSUNG|   47.05882352941176|
|2012|BIRLA INSTITUTE O...|       CISCO|   33.33333333333333|
|2012|BIRLA INSTITUTE O...|       WIPRO|   66.66666666666666|
|2012|BIRLA INSTITUTE O...|   COGNIZANT|   83.33333333333334|
|2012|BIRLA INSTITUTE O...|   JP MORGAN|  42.857142857142854|
|2012|BIRLA INSTITUTE O...|   CAPGEMINI|   54.54545454545454|
|2012|BI

- *Save results of above query on local storage in CSV format*

In [None]:
# Save Query 2 to CSV File - Save REPORTS
selection_df.write.mode("overwrite").csv(r"C:\MINI-PROJECT\Reports\Placement_Percent", header=True)

---

- *QUERY 3*

In [16]:
# Query 3 : Year Wise Quarter Wise Placement
from pyspark.sql.functions import sum

# Join fact_College_df with time_df and college_df to get Year, Quarter, and College_Name
quarterly_placement_df = fact_College_df \
    .join(time_df, fact_College_df["Time_ID"] == time_df["Time_ID"], "inner") \
    .join(college_df, fact_College_df["College_ID"] == college_df["College_ID"], "inner") \
    .select("Year", "Quarter", "College_Name", "Student_Selected")

# Group by Year, Quarter, and College_Name, then calculate the total number of students selected
quarterly_placement_df = quarterly_placement_df.groupBy("Year", "Quarter", "College_Name") \
    .agg(sum("Student_Selected").alias("Total_Students_Selected")).orderBy("Year")

# Show the result
quarterly_placement_df.show()


+----+-------+--------------------+-----------------------+
|Year|Quarter|        College_Name|Total_Students_Selected|
+----+-------+--------------------+-----------------------+
|2012|      4|      SRM UNIVERSITY|                    755|
|2012|      3|DHIRUBHAI AMBANI ...|                   1347|
|2012|      4|DHIRUBHAI AMBANI ...|                    762|
|2012|      2|        IIT GUWAHATI|                    265|
|2012|      3|     MNNIT ALLAHABAD|                    932|
|2012|      4|        IIT GUWAHATI|                    194|
|2012|      2|      SRM UNIVERSITY|                    955|
|2012|      3|        IIT GUWAHATI|                    563|
|2012|      3|BIRLA INSTITUTE O...|                   1510|
|2012|      2|          IIT KANPUR|                    653|
|2012|      1|       IIT KHARAGPUR|                    286|
|2012|      2|PSG COLLEGE OF TE...|                   1035|
|2012|      1|        IIT GUWAHATI|                    343|
|2012|      1|COLLEGE OF ENGINE...|     

- *Save results of above query on local storage in CSV format*

In [None]:
# Save Query 3 to CSV File - Save REPORTS
quarterly_placement_df.write.mode("overwrite").csv(r"C:\MINI-PROJECT\Reports\Quarterly_placement", header=True)

---

- *QUERY 4

In [18]:
# Query 4 : Total Students Selected per Company
from pyspark.sql.functions import sum, avg, col

# Join fact_College_df with time_df and company_df to get Year, Company_Name, Student_Selected, and Package
yearly_selected_df = fact_College_df \
    .join(time_df, fact_College_df["Time_ID"] == time_df["Time_ID"], "inner") \
    .join(company_df, fact_College_df["Company_ID"] == company_df["Company_ID"], "inner") \
    .select("Year", "Company_Name", "Student_Selected", "Package")

# Group by Year and Company_Name, calculate total students selected and average package
yearly_selected_df = yearly_selected_df.groupBy("Year", "Company_Name", "Package") \
    .agg(
        sum("Student_Selected").alias("Total_Students_Selected"),
    ).orderBy("Year")

# Show the result
yearly_selected_df.show()

+----+------------+-------+-----------------------+
|Year|Company_Name|Package|Total_Students_Selected|
+----+------------+-------+-----------------------+
|2012|       EMC 2|    6.0|                    626|
|2012|      VMWARE|    7.0|                    837|
|2012|    FACEBOOK|    8.5|                     48|
|2012|       WIPRO|   3.25|                   3735|
|2012|   COGNIZANT|   3.15|                   5705|
|2012|       IGATE|   3.15|                   7213|
|2012|     INFOSYS|    3.5|                   5007|
|2012|       CISCO|    9.5|                    272|
|2012|        KPMG|    5.0|                   1201|
|2012|         HCL|    4.5|                   6583|
|2012|   ACCENTURE|    3.0|                   2263|
|2012|      GOOGLE|    8.0|                     75|
|2012|   CAPGEMINI|    9.0|                    377|
|2012|     SAMSUNG|    7.0|                    892|
|2012|   HONEYWELL|    7.5|                   2062|
|2012|   JP MORGAN|    4.5|                   1016|
|2012|   MIC

- *Save results of above query on local storage in CSV format*

In [None]:
# Save Query 4 to CSV File - Save REPORTS
yearly_selected_df.write.mode("overwrite").csv(r"C:\MINI-PROJECT\Reports\Student_Selected_per_Company", header=True)

---

- *QUERY 5*

In [20]:
# QUERY 5 : College Ranking Year wise
ranking_college_df = fact_survey_df \
    .join(time_df, fact_survey_df["Time_ID"] == time_df["Time_ID"], "inner") \
    .join(college_df, fact_survey_df["College_ID"] == college_df["College_ID"], "inner") \
    .select("Year", "College_Name", "Rank").orderBy("College_Name","Year")

# show result
ranking_college_df.show()

+----+--------------------+----+
|Year|        College_Name|Rank|
+----+--------------------+----+
|2012|BIRLA INSTITUTE O...|   7|
|2013|BIRLA INSTITUTE O...|  12|
|2014|BIRLA INSTITUTE O...|   8|
|2012|COLLEGE OF ENGINE...|  17|
|2013|COLLEGE OF ENGINE...|  15|
|2014|COLLEGE OF ENGINE...|  14|
|2012|DELHI COLLEGE OF ...|  20|
|2013|DELHI COLLEGE OF ...|  18|
|2014|DELHI COLLEGE OF ...|  19|
|2012|DHIRUBHAI AMBANI ...|  15|
|2013|DHIRUBHAI AMBANI ...|  13|
|2014|DHIRUBHAI AMBANI ...|  17|
|2012|       IIIT HYDERBAD|  13|
|2013|       IIIT HYDERBAD|  10|
|2014|       IIIT HYDERBAD|   6|
|2012|           IIT DELHI|   2|
|2013|           IIT DELHI|   4|
|2014|           IIT DELHI|   3|
|2012|        IIT GUWAHATI|   8|
|2013|        IIT GUWAHATI|   7|
+----+--------------------+----+
only showing top 20 rows



- *Save results of above query on local storage in CSV format*

In [None]:
# Save Query 5 to CSV File - Save REPORTS
ranking_college_df.write.mode("overwrite").csv(r"C:\MINI-PROJECT\Reports\College_Ranking", header=True)

7. *End Spark Session*

In [22]:
# Stop the SparkSession
spark.stop()

<center>--- END of Code ---</center>