In [None]:
#Question 2. 
''' Consider two dataset “Company.txt” and “College.txt”
    a. Create two table structure as follows by using case class:
        i. College table:
            dates:String,
            college:String,
            location:String,
            number_of_indian_students:String,
            number_of_foreign_students:String,
            fees:String , 
            survey_name:String, 
            rank:Int
        ii. Company table:
            dates:String,
            college:String,
            company:String,
            location:String,
            country:String,
            package:Double ,
            selected:Int, 
            participated:Int, 
            criteria:Int
            
    b. Read these two txt files into RDD and map the RDD with these table structure.
    c. Get the result year wise college ranking
    d. List down the College name and Total Numbers of Students including Indian & Foreigner who
    conducted the campus drive in “TCS” and “Capgemini”.
    e. List down the College name and ranking whose student have got opportunities in “Google” and
    “Facebook”
    f. List down the Company names with number of campus drives.
    g. List down the colleges state wise with total no. of Student Selection.
    h. List down the full details of College with Campus drive details who have highest number of Foreign
    Students.
    i. List down top 5 college name with their number of Selected Students
    j. List down the College Name with Fees whose student are getting package above of 10 LPA package
    k. List down the College Name with the Rank who have the highest criteria to get a job
'''

In [39]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import col, year, to_date, split

In [40]:
spark = SparkSession.builder.appName("Spark Assessment2").getOrCreate()

In [42]:
College = StructType([
    StructField("dates", StringType(), True),
    StructField("college", StringType(), True),
    StructField("location", StringType(), True),
    StructField("number_of_indian_students", StringType(), True),
    StructField("number_of_foreign_students", StringType(), True),
    StructField("fees", StringType(), True),
    StructField("survey_name", StringType(), True),
    StructField("rank", IntegerType(), True)
])

Company = StructType([
    StructField("dates", StringType(), True),
    StructField("college", StringType(), True),
    StructField("company", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("package", FloatType(), True),
    StructField("selected", IntegerType(), True),
    StructField("participated", IntegerType(), True),
    StructField("criteria", IntegerType(), True)
])

In [43]:
collegeRDD = spark.sparkContext.textFile("College.txt")
companyRDD = spark.sparkContext.textFile("Company.txt")

collegeDF = spark.createDataFrame(collegeRDD.map(lambda x: x.split("\t")), schema=College)
companyDF = spark.createDataFrame(companyRDD.map(lambda x: x.split("\t")), schema=Company)


In [45]:
collegeDF = collegeDF.withColumn("year", year(to_date(col("dates"), "yyyy-MM-dd")))
collegeRankingDF = collegeDF.groupBy("year", "college") \
    .agg({"rank": "avg"}) \
    .orderBy("year", "avg(rank)")

In [48]:
from pyspark.sql import functions as F

companyDF = companyDF.filter(col("company").isin(["TCS", "Capgemini"]))
collegeStudentCountDF = companyDF.join(collegeDF, on="college", how="inner") \
    .groupBy("college") \
    .agg((F.col("number_of_indian_students").cast("int") + F.col("number_of_foreign_students").cast("int")).alias("total_students")) \
    .select("college", "total_students")

AnalysisException: [MISSING_AGGREGATION] The non-aggregating expression "number_of_indian_students" is based on columns which are not participating in the GROUP BY clause.
Add the columns or the expression to the GROUP BY, aggregate the expression, or use "any_value(number_of_indian_students)" if you do not care which of the values within a group is returned.;
Aggregate [college#104], [college#104, (cast(number_of_indian_students#90 as int) + cast(number_of_foreign_students#91 as int)) AS total_students#269]
+- Project [college#104, dates#103, company#105, location#106, country#107, package#108, selected#109, participated#110, criteria#111, dates#87, location#89, number_of_indian_students#90, number_of_foreign_students#91, fees#92, survey_name#93, rank#94, year#141]
   +- Join Inner, (college#104 = college#88)
      :- Filter company#105 IN (TCS,Capgemini)
      :  +- Filter company#105 IN (TCS,Capgemini)
      :     +- Filter company#105 IN (TCS,Capgemini)
      :        +- LogicalRDD [dates#103, college#104, company#105, location#106, country#107, package#108, selected#109, participated#110, criteria#111], false
      +- Project [dates#87, college#88, location#89, number_of_indian_students#90, number_of_foreign_students#91, fees#92, survey_name#93, rank#94, year(to_date(dates#87, Some(yyyy-MM-dd), Some(Asia/Calcutta), false)) AS year#141]
         +- Project [dates#87, college#88, location#89, number_of_indian_students#90, number_of_foreign_students#91, fees#92, survey_name#93, rank#94, year(to_date(dates#87, Some(yyyy-MM-dd), Some(Asia/Calcutta), false)) AS year#121]
            +- LogicalRDD [dates#87, college#88, location#89, number_of_indian_students#90, number_of_foreign_students#91, fees#92, survey_name#93, rank#94], false


In [None]:
companyDriveCountDF = companyDF.groupBy("company").count() \
    .select("company", "count") \
    .orderBy("count", ascending=False)

In [None]:
collegeDF = collegeDF.withColumn("state", split(col("location"), ",")[1])  # Assuming comma-separated location format
selectedStudentsDF = companyDF.join(collegeDF, on="college", how="inner") \
    .filter(col("selected") > 0)
stateStudentCountDF = selectedStudentsDF.groupBy("state") \
    .agg(col("selected").sum().alias("total_selections")) \
    .orderBy("total_selections", ascending=False)

In [None]:
maxForeignStudents = collegeDF.select(col("number_of_foreign_students").cast("int")).rdd.max()[0]
collegeWithMaxForeignStudentsDF = collegeDF.filter(col("number_of_foreign_students").cast("int") == maxForeignStudents)
joinedDetailsDF = collegeWithMaxForeignStudentsDF.join(companyDF, on="college", how="full")  # Full join to include colleges without drives

In [None]:
topSelectedStudentsDF = companyDF.join(collegeDF, on="college", how="inner") \
    .groupBy("college") \
    .agg(col("selected").sum().alias("total_selected")) \
    .orderBy("total_selected", ascending=False) \
    .limit(5)

In [None]:
packageAbove10DF = companyDF.join(collegeDF, on="college", how="inner") \
    .filter(col("package") > 10) \
    .select("college", "fees")

In [None]:
maxCriteria = companyDF.select(col("criteria").cast("int")).rdd.max()[0]
collegeWithMaxCriteriaDF = collegeDF.filter(col("criteria") == maxCriteria)

In [None]:
collegeDF.printSchema()
companyDF.printSchema()