In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [14]:
from pyspark.sql import SparkSession
import pandas as pd

# Create Spark session
spark = SparkSession.builder \
    .appName("LocalSparkExample") \
    .master("local[*]") \
    .getOrCreate()

# Step 1: Use RAW GitHub CSV link
url = "https://raw.githubusercontent.com/makhan010385/DataBricks-/main/merged_final.csv"

# Step 2: Load CSV into Pandas DataFrame
pdf = pd.read_csv(url)

# Step 3: Convert Pandas → Spark DataFrame
df = spark.createDataFrame(pdf)

# Step 4: Show results
df.show(5)
df.printSchema()


+----------+----------------+--------------------+---------------+-------------+-----------+------------+---------------+----+----+
|Unnamed: 0|          author|           statement|         source|         date|     target|BinaryTarget|BinaryNumTarget|Fake|Real|
+----------+----------------+--------------------+---------------+-------------+-----------+------------+---------------+----+----+
|         0|Marta Campabadal|“Netflix estrenó ...| Facebook posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE| NaN|
|         1|  Louis Jacobson|Says that under h...|      Joe Biden|June 29, 2023|mostly-true|        REAL|              1| NaN|REAL|
|         2|    Jeff Cercone|"ONU ordena despe...| Facebook posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE| NaN|
|         3|      Sara Swann|NASA warns of “in...| Facebook posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE| NaN|
|         4|    Jeff Cercone|Video suggests CO...|Instagram posts|June 29, 2

#Display the summary statistics

In [None]:
# Display the summary statistics of the data
df.describe().show()

+-------+------------------+------------+--------------------+--------------------+--------------------+-------------+-------------+-------------------+-------------------+--------------------+
|summary|               _c0|         _c1|                 _c2|                 _c3|                 _c4|          _c5|          _c6|                _c7|                _c8|                 _c9|
+-------+------------------+------------+--------------------+--------------------+--------------------+-------------+-------------+-------------------+-------------------+--------------------+
|  count|              6001|        6001|                6001|                6001|                6001|         6001|         6001|               6001|               5253|                 573|
|   mean|            2999.5|        NULL|                NULL|                 0.0|                 0.0|          2.0|         NULL|0.12988826815642457| 0.4090909090909091| 0.16666666666666666|
| stddev|1732.1951391226105|  

#Convert Spark Dataframe to Pandas

In [None]:
import pandas as pd
pandas_df =df.toPandas()
pandas_df.head()
pandas_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6000 entries, 0 to 5999
Data columns (total 10 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   Unnamed: 0       6000 non-null   int32 
 1   author           6000 non-null   object
 2   statement        6000 non-null   object
 3   source           6000 non-null   object
 4   date             6000 non-null   object
 5   target           6000 non-null   object
 6   BinaryTarget     6000 non-null   object
 7   BinaryNumTarget  6000 non-null   object
 8   Fake             5252 non-null   object
 9   Real             572 non-null    object
dtypes: int32(1), object(9)
memory usage: 445.4+ KB


#Now we can use pandas operations on the pandas_df dataframe.

##Data Transformation
##Select specific columns

In [None]:
# Select specific columns
df.select("Unnamed: 0", "BinaryNumTarget").show(5)

+----------+---------------+
|Unnamed: 0|BinaryNumTarget|
+----------+---------------+
|         0|              0|
|         1|              1|
|         2|              0|
|         3|              0|
|         4|              0|
+----------+---------------+
only showing top 5 rows



#Rename a column

In [None]:
df_renamed =df.withColumnRenamed("Unnamed: 0", "id")
df_renamed.show(5)

+---+----------------+--------------------+---------------+-------------+-----------+------------+---------------+----+----+
| id|          author|           statement|         source|         date|     target|BinaryTarget|BinaryNumTarget|Fake|Real|
+---+----------------+--------------------+---------------+-------------+-----------+------------+---------------+----+----+
|  0|Marta Campabadal|“Netflix estrenó ...| Facebook posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE|NULL|
|  1|  Louis Jacobson|Says that under h...|      Joe Biden|June 29, 2023|mostly-true|        REAL|              1|NULL|REAL|
|  2|    Jeff Cercone|"""ONU ordena des...| Facebook posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE|NULL|
|  3|      Sara Swann|NASA warns of “in...| Facebook posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE|NULL|
|  4|    Jeff Cercone|Video suggests CO...|Instagram posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE|NULL|


#Filter data based on a condition

In [None]:
# Filter data based on a condition
df_renamed.filter(df_renamed.id > 10).show(5)

+---+----------------+--------------------+---------------+-------------+----------+------------+---------------+----+----+
| id|          author|           statement|         source|         date|    target|BinaryTarget|BinaryNumTarget|Fake|Real|
+---+----------------+--------------------+---------------+-------------+----------+------------+---------------+----+----+
| 11|      Nuria Diaz|John F. Kennedy e...|Instagram posts|June 27, 2023|pants-fire|        FAKE|              0|FAKE|NULL|
| 12|   Maria Briceño|"""Se filtran aud...| Facebook posts|June 27, 2023|     FALSE|        FAKE|              0|FAKE|NULL|
| 13|Marta Campabadal|Fotos muestran ro...| Facebook posts|June 27, 2023|     FALSE|        FAKE|              0|FAKE|NULL|
| 14|    Jeff Cercone|Audio is of “the ...| Facebook posts|June 27, 2023|     FALSE|        FAKE|              0|FAKE|NULL|
| 15|   Maria Briceño|"""Imagenes de lo...| Facebook posts|June 27, 2023|     FALSE|        FAKE|              0|FAKE|NULL|
+---+---

In [None]:
# Filter data based on a condition
# We & opprator

df_renamed.filter((df_renamed.id > 10)&(df_renamed.id < 20)).show(5)

# With between ofpp

df_renamed.filter(df_renamed.id.between(12,80)).show(5)

+---+----------------+--------------------+---------------+-------------+----------+------------+---------------+----+----+
| id|          author|           statement|         source|         date|    target|BinaryTarget|BinaryNumTarget|Fake|Real|
+---+----------------+--------------------+---------------+-------------+----------+------------+---------------+----+----+
| 11|      Nuria Diaz|John F. Kennedy e...|Instagram posts|June 27, 2023|pants-fire|        FAKE|              0|FAKE|NULL|
| 12|   Maria Briceño|"""Se filtran aud...| Facebook posts|June 27, 2023|     FALSE|        FAKE|              0|FAKE|NULL|
| 13|Marta Campabadal|Fotos muestran ro...| Facebook posts|June 27, 2023|     FALSE|        FAKE|              0|FAKE|NULL|
| 14|    Jeff Cercone|Audio is of “the ...| Facebook posts|June 27, 2023|     FALSE|        FAKE|              0|FAKE|NULL|
| 15|   Maria Briceño|"""Imagenes de lo...| Facebook posts|June 27, 2023|     FALSE|        FAKE|              0|FAKE|NULL|
+---+---

In [None]:
df_renamed.show(10)

+---+--------------------+--------------------+--------------------+-------------+-----------+------------+---------------+----+----+
| id|              author|           statement|              source|         date|     target|BinaryTarget|BinaryNumTarget|Fake|Real|
+---+--------------------+--------------------+--------------------+-------------+-----------+------------+---------------+----+----+
|  0|    Marta Campabadal|“Netflix estrenó ...|      Facebook posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE|NULL|
|  1|      Louis Jacobson|Says that under h...|           Joe Biden|June 29, 2023|mostly-true|        REAL|              1|NULL|REAL|
|  2|        Jeff Cercone|"""ONU ordena des...|      Facebook posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE|NULL|
|  3|          Sara Swann|NASA warns of “in...|      Facebook posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE|NULL|
|  4|        Jeff Cercone|Video suggests CO...|     Instagram 

#Some more filtering examples:

In [None]:
df_renamed.filter((df_renamed.author == 'Marta Campabadal')).show(5)

+---+----------------+--------------------+--------------+-------------+----------+------------+---------------+----+----+
| id|          author|           statement|        source|         date|    target|BinaryTarget|BinaryNumTarget|Fake|Real|
+---+----------------+--------------------+--------------+-------------+----------+------------+---------------+----+----+
|  0|Marta Campabadal|“Netflix estrenó ...|Facebook posts|June 29, 2023|     FALSE|        FAKE|              0|FAKE|NULL|
| 10|Marta Campabadal|Los cinco tripula...|Facebook posts|June 27, 2023|     FALSE|        FAKE|              0|FAKE|NULL|
| 13|Marta Campabadal|Fotos muestran ro...|Facebook posts|June 27, 2023|     FALSE|        FAKE|              0|FAKE|NULL|
| 59|Marta Campabadal|“Arma que EE.UU. ...|Facebook posts|   • June 20,|     FALSE|        FAKE|              0|FAKE|NULL|
| 75|Marta Campabadal|"""Panico en Mosc...|Facebook posts|June 19, 2023|pants-fire|        FAKE|              0|FAKE|NULL|
+---+-----------

In [None]:
df_renamed.filter((df_renamed.author.like('Louis%'))).show(5)

+---+--------------+--------------------+--------------------+--------------+------------+------------+---------------+----+----+
| id|        author|           statement|              source|          date|      target|BinaryTarget|BinaryNumTarget|Fake|Real|
+---+--------------+--------------------+--------------------+--------------+------------+------------+---------------+----+----+
|  1|Louis Jacobson|Says that under h...|           Joe Biden| June 29, 2023| mostly-true|        REAL|              1|NULL|REAL|
| 21|Louis Jacobson|“Median income in...|           Tim Scott| June 26, 2023|   half-true|        REAL|              1|NULL|REAL|
|114|Louis Jacobson|“The family separ...|          Mike Pence|  June 8, 2023|       FALSE|        FAKE|              0|FAKE|NULL|
|131|Louis Jacobson|"Having ""biologi...|         Nikki Haley|  June 6, 2023|       FALSE|        FAKE|              0|FAKE|NULL|
|139|Louis Jacobson|"""Every study ha...| “it puts more pe...|Kevin McCarthy|June 1, 2023|

In [None]:
df_renamed.filter((df_renamed.author.endswith('son'))).show(5)

+---+--------------+--------------------+--------------------+--------------+------------+------------+---------------+----+----+
| id|        author|           statement|              source|          date|      target|BinaryTarget|BinaryNumTarget|Fake|Real|
+---+--------------+--------------------+--------------------+--------------+------------+------------+---------------+----+----+
|  1|Louis Jacobson|Says that under h...|           Joe Biden| June 29, 2023| mostly-true|        REAL|              1|NULL|REAL|
| 21|Louis Jacobson|“Median income in...|           Tim Scott| June 26, 2023|   half-true|        REAL|              1|NULL|REAL|
|114|Louis Jacobson|“The family separ...|          Mike Pence|  June 8, 2023|       FALSE|        FAKE|              0|FAKE|NULL|
|131|Louis Jacobson|"Having ""biologi...|         Nikki Haley|  June 6, 2023|       FALSE|        FAKE|              0|FAKE|NULL|
|139|Louis Jacobson|"""Every study ha...| “it puts more pe...|Kevin McCarthy|June 1, 2023|

#Join DataFrames

In [18]:
# Sorting data using orderBy() method
sorted_df = df.orderBy("author").show(5)

+----------+------------+--------------------+-----------------+--------------+-----------+------------+---------------+----+----+
|Unnamed: 0|      author|           statement|           source|          date|     target|BinaryTarget|BinaryNumTarget|Fake|Real|
+----------+------------+--------------------+-----------------+--------------+-----------+------------+---------------+----+----+
|       327|Aarón Torres|“Fentanyl is the ...|      Greg Abbott|April 24, 2023|mostly-true|        REAL|              1| NaN|REAL|
|      3426|Aarón Torres|When discussing e...|Brandon Creighton|March 30, 2023|  half-true|        REAL|              1| NaN| NaN|
|       426|Aarón Torres|When discussing e...|Brandon Creighton|March 30, 2023|  half-true|        REAL|              1| NaN|REAL|
|      3327|Aarón Torres|“Fentanyl is the ...|      Greg Abbott|April 24, 2023|mostly-true|        REAL|              1| NaN| NaN|
|       469|Aarón Torres|"Businesses large...|      Greg Abbott|March 20, 2023|  ha

#Count articles per author by sql

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Start Spark session
spark = SparkSession.builder.appName("AggregationExample").getOrCreate()

# Load CSV
df = spark.createDataFrame(pdf)

# Group by Author and count
df_agg = df.groupBy("author").agg(
    F.count("*").alias("article_count")
)

# Sort by highest count
df_agg = df_agg.orderBy(F.desc("article_count"))

df_agg.show(10)


+-----------------+-------------+
|           author|article_count|
+-----------------+-------------+
|   Ciara O'Rourke|         1732|
|     Jeff Cercone|          372|
|Gabrielle Settles|          340|
|    Tom Kertscher|          304|
|   Madison Czopek|          280|
|      Andy Nguyen|          268|
|   Louis Jacobson|          240|
|  Loreben Tuquero|          228|
|      Amy Sherman|          216|
|       Sara Swann|          196|
+-----------------+-------------+
only showing top 10 rows



#Count distinct sources per author

In [None]:
df.groupBy("author").agg(
    F.countDistinct("source").alias("unique_sources")
).show(10)

+---------------+--------------+
|         author|unique_sources|
+---------------+--------------+
|  Jon Greenberg|            15|
|  Liam Halawith|             4|
| Sydney Carruth|             1|
|    Amy Sherman|            28|
|   Warren Fiske|            15|
|    Grace Abels|            10|
|Faithlyn Graham|             1|
|   Jill Terreri|            11|
| Vanessa Swales|             7|
|   Blake Farmer|             1|
+---------------+--------------+
only showing top 10 rows



#Multiple aggregations in one go

In [None]:
df.groupBy("author").agg(
    F.count("*").alias("total_articles"),
    F.countDistinct("source").alias("unique_sources"),
    F.first("date").alias("first_article_date"),
    F.last("date").alias("last_article_date")
).show(10)

+-------------+--------------+--------------+--------------------+-------------------+
|       author|total_articles|unique_sources|  first_article_date|  last_article_date|
+-------------+--------------+--------------+--------------------+-------------------+
| Aarón Torres|            12|             2|      March 30, 2023|     March 20, 2023|
| Alan Hovorka|             4|             1|     August 12, 2022|    August 12, 2022|
| Alexis Waiss|            16|             3|    October 18, 2022|   November 2, 2022|
|Amanda Boring|             4|             1|    October 18, 2022|   October 18, 2022|
|  Amy Sherman|           216|            28| a total mess. Th...| September 30, 2022|
|  Andy Nguyen|           268|            11| employee passed ...|     August 1, 2022|
|    Ben Wells|             4|             1|    October 31, 2022|   October 31, 2022|
| Blake Farmer|             4|             1| America First Legal|America First Legal|
| Cameron Carr|            16|             

#Using SQL functions with .select()

In [None]:
df.select(
    F.count("*").alias("total_rows"),
    F.countDistinct("author").alias("unique_authors")
).show(5)

+----------+--------------+
|total_rows|unique_authors|
+----------+--------------+
|      6000|            71|
+----------+--------------+



#Register as SQL table and run SQL-style aggregation

In [None]:
df.createOrReplaceTempView("mydata")

spark.sql("""
    SELECT author, COUNT(*) AS article_count
    FROM mydata
    GROUP BY author
    ORDER BY article_count DESC
    LIMIT 10
""").show()


+-----------------+-------------+
|           author|article_count|
+-----------------+-------------+
|   Ciara O'Rourke|         1732|
|     Jeff Cercone|          372|
|Gabrielle Settles|          340|
|    Tom Kertscher|          304|
|   Madison Czopek|          280|
|      Andy Nguyen|          268|
|   Louis Jacobson|          240|
|  Loreben Tuquero|          228|
|      Amy Sherman|          216|
|       Sara Swann|          196|
+-----------------+-------------+



In [20]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

# Load CSV
df = spark.createDataFrame(pdf)
df.printSchema()

root
 |-- Unnamed: 0: long (nullable = true)
 |-- author: string (nullable = true)
 |-- statement: string (nullable = true)
 |-- source: string (nullable = true)
 |-- date: string (nullable = true)
 |-- target: string (nullable = true)
 |-- BinaryTarget: string (nullable = true)
 |-- BinaryNumTarget: long (nullable = true)
 |-- Fake: string (nullable = true)
 |-- Real: string (nullable = true)



In [None]:
from pyspark.sql.functions import when

# Example: create binary label (1 if True, 0 if False)
df = df.withColumn("label", when(df["BinaryTarget"] == "REAL", 1).otherwise(0))

In [None]:
df.show(5)

+----------+----------------+--------------------+---------------+-------------+-----------+------------+---------------+----+----+-----+
|Unnamed: 0|          author|           statement|         source|         date|     target|BinaryTarget|BinaryNumTarget|Fake|Real|label|
+----------+----------------+--------------------+---------------+-------------+-----------+------------+---------------+----+----+-----+
|         0|Marta Campabadal|“Netflix estrenó ...| Facebook posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE|NULL|    0|
|         1|  Louis Jacobson|Says that under h...|      Joe Biden|June 29, 2023|mostly-true|        REAL|              1|NULL|REAL|    1|
|         2|    Jeff Cercone|"""ONU ordena des...| Facebook posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE|NULL|    0|
|         3|      Sara Swann|NASA warns of “in...| Facebook posts|June 29, 2023|      FALSE|        FAKE|              0|FAKE|NULL|    0|
|         4|    Jeff Cercone|Video

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Convert categorical text into numeric indexes
author_indexer = StringIndexer(inputCol="author", outputCol="author_index")
source_indexer = StringIndexer(inputCol="source", outputCol="source_index")

# Assemble features into one vector
assembler = VectorAssembler(
    inputCols=["author_index", "source_index"],  # add more numeric features if available
    outputCol="features"
)

In [None]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")

# Build pipeline
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[author_indexer, source_indexer, assembler, lr])

# Train model
model = pipeline.fit(train_data)


In [None]:
predictions = model.transform(test_data)
predictions.select("Author", "Source", "label", "prediction", "probability").show(10)

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction")
print("Test AUC:", evaluator.evaluate(predictions))


+--------------------+---------------+-----+----------+--------------------+
|              Author|         Source|label|prediction|         probability|
+--------------------+---------------+-----+----------+--------------------+
|        Jeff Cercone| Facebook posts|    0|       0.0|[0.94604377444286...|
|       Tom Kertscher| Facebook posts|    0|       0.0|[0.93984929247315...|
|Sofia Bliss-Carra...|Vivek Ramaswamy|    1|       0.0|[0.54531579949987...|
|    Marta Campabadal| Facebook posts|    0|       0.0|[0.89235204140892...|
|       Tom Kertscher|Instagram posts|    0|       0.0|[0.93946411269795...|
|       Tom Kertscher|Instagram posts|    0|       0.0|[0.93946411269795...|
|Sofia Bliss-Carra...|      Joe Biden|    0|       0.0|[0.66775216157029...|
|      Ciara O'Rourke| Facebook posts|    0|       0.0|[0.94891074186131...|
|       Tom Kertscher| Facebook posts|    0|       0.0|[0.93984929247315...|
|       Maria Briceño| Facebook posts|    0|       0.0|[0.88669017492780...|

#TF–IDF Approach (Term Frequency – Inverse Document Frequency)
#TF–IDF gives you a vector representation of text based on word importance.

#Steps:
##Tokenize — split "Statement" into words.

##Remove stopwords — drop common words like “the”, “is”, “and”.

##HashingTF — map words to term frequency vectors.

##IDF — scale term frequencies by importance across the corpus.

##Assemble features — combine TF–IDF vector with any other numeric features.

##Train Logistic Regression

In [None]:
df = df.filter(df.statement.isNotNull())
df = df.filter(df.statement != "")

In [None]:
df = df.filter(df.BinaryTarget.isNotNull())

# Check distinct classes
df.select("BinaryTarget").distinct().show()

+--------------------+
|        BinaryTarget|
+--------------------+
|               FALSE|
|        DUI offenses|
|      March 20, 2023|
|     Instagram posts|
|    October 25, 2022|
|                FAKE|
|    November 3, 2022|
|                TRUE|
|    November 2, 2022|
|    October 12, 2022|
|        May 26, 2023|
|      March 17, 2023|
|     August 18, 2022|
|          pants-fire|
|    November 4, 2022|
|      Facebook posts|
|     August 26, 2022|
|           half-true|
|       July 29, 2022|
|American Leadersh...|
+--------------------+
only showing top 20 rows



In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# 1️⃣ Start Spark Session
spark = SparkSession.builder \
    .appName("FakeNews_TFIDF_LR") \
    .getOrCreate()

# 2️⃣ Load CSV
df = spark.createDataFrame(pdf)

# 3️⃣ Keep only needed columns
# Adjust column names if your CSV is different
df = df.select("statement", "BinaryTarget")

# 4️⃣ Clean Data - Remove nulls & empty text
df = df.filter(col("statement").isNotNull())
df = df.filter(col("statement") != "")
df = df.filter(col("BinaryTarget").isNotNull())

# 5️⃣ Create binary label column
# Example: TRUE → 1, FALSE → 0
df = df.withColumn("label", when(col("BinaryTarget") == "REAL", 1).otherwise(0))

# 6️⃣ Tokenize, Remove Stopwords, Apply TF–IDF
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

tokenizer = Tokenizer(inputCol="statement", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
hashingTF = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")

# 7️⃣ Logistic Regression
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="label")

# 8️⃣ Pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

# 9️⃣ Train/Test Split — Ensure both have at least 2 classes
train, test = df.randomSplit([0.8, 0.2], seed=42)

# Check class counts
train_labels = train.select("label").distinct().count()
test_labels = test.select("label").distinct().count()

if train_labels < 2 or test_labels < 2:
    raise ValueError("Train/Test split does not contain both label classes. Adjust split or data.")

# 🔟 Train model
model = pipeline.fit(train)

# 1️⃣1️⃣ Predictions
predictions = model.transform(test)
predictions.select("Statement", "label", "prediction", "probability").show(10, truncate=50)

# 1️⃣2️⃣ Evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction")

auc = evaluator.evaluate(predictions)
print("Test AUC:", auc)


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/content/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/content/spark-3.5.1-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
# Check if any statement appears in both train and test
train_statements = set([row.statement for row in train.collect()])
test_statements = set([row.statement for row in test.collect()])
overlap = train_statements.intersection(test_statements)
print(f"Overlap count: {len(overlap)}")

Overlap count: 870


In [None]:
# Remove exact duplicates based on 'Statement'
df_clean = df.dropDuplicates(["statement"])

# Now split into train/test without leakage
train, test = df_clean.randomSplit([0.8, 0.2], seed=42)

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=ParamGridBuilder().build(),
    evaluator=BinaryClassificationEvaluator(),
    numFolds=5
)

cvModel = cv.fit(df_clean)

KeyboardInterrupt: 

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col

# -------------------
# 1. Start Spark Session
# -------------------
spark = SparkSession.builder \
    .appName("LogisticRegressionTFIDF_Binary") \
    .getOrCreate()

# -------------------
# 2. Load Data
# -------------------
df = spark.read.csv("merged_final.csv", header=True, inferSchema=True)

# -------------------
# 3. Ensure Binary Labels
# -------------------
# Keep only rows where BinaryTarget is 0, 1, True, or False
df_clean = df.filter(col("BinaryTarget").isin(0, 1, "0", "1", "REAL", "FAKE"))

# Drop duplicates on text column to avoid leakage
df_clean = df_clean.dropDuplicates(["statement"])

# -------------------
# 4. Prepare ML Pipeline
# -------------------
# Convert label to numeric (0 or 1)
label_indexer = StringIndexer(inputCol="BinaryTarget", outputCol="label")

# Tokenize text
tokenizer = Tokenizer(inputCol="statement", outputCol="words")

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# Term Frequency
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=5000)

# Inverse Document Frequency
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Logistic Regression (binary)
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20, regParam=0.01)

# Full pipeline
pipeline = Pipeline(stages=[label_indexer, tokenizer, remover, hashingTF, idf, lr])

# -------------------
# 5. Train-Test Split
# -------------------
train, test = df_clean.randomSplit([0.8, 0.2], seed=42)

# -------------------
# 6. Train Model
# -------------------
model = pipeline.fit(train)

# -------------------
# 7. Predictions
# -------------------
predictions = model.transform(test)

# -------------------
# 8. Evaluation
# -------------------
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)

predictions.select("statement", "BinaryTarget", "prediction", "probability").show(10, truncate=True)
print(f"Test AUC: {auc:.3f}")


#PySpark Assignment: Loan Data Analysis
#Dataset: loan.csv (contains loan applications with customer details, loan amount, purpose, and status)

[Loan Dataset](https://github.com/makhan010385/DataBricks-/blob/main/loan.csv)


#Objective:
##We will use PySpark to load, process, and analyze the loan dataset, applying DataFrame operations, Spark SQL, and Machine Learning concepts.

##Part 1 — Data Loading & Exploration
###Start a SparkSession in PySpark.

###Load loan.csv into a PySpark DataFrame with headers.

###Display the first 10 rows of the dataset.

###Show the schema of the DataFrame.

###Count the total number of records.

##Part 2 — Data Cleaning
###Remove duplicate rows.

####Handle missing values:

###Drop rows with missing loan_status or loan_amnt.

###Convert numeric columns to the correct data types (e.g., loan_amnt to integer).

##Part 3 — Data Analysis
###Find the average loan amount for each loan purpose.

###Count how many loans were Fully Paid vs Charged Off.

###Find the top 5 loan purposes by average funded amount.

##Part 4 — Spark SQL
###Register the DataFrame as a temporary SQL view.

###Using Spark SQL:

###Get the loan purpose with the highest default rate.

###Find the state with the most charged-off loans.

##Part 5 — Machine Learning (Optional)
###Use StringIndexer to encode categorical columns like purpose and loan_status.

###Use VectorAssembler to combine features.

###Train a Logistic Regression model to predict loan_status (binary classification: Fully Paid vs Charged Off).

###Evaluate model accuracy.

### **Part 1 Solution ( Data Loading & Exploration )**
Start a SparkSession in PySpark.
Load loan.csv into a PySpark DataFrame with headers.
Display the first 10 rows of the dataset.
Show the schema of the DataFrame.
Count the total number of records.


In [49]:
from pyspark.sql import SparkSession
import pandas as pd

# Create Spark session
spark = SparkSession.builder \
    .appName("LocalSparkExample") \
    .master("local[*]") \
    .getOrCreate()

# Step 1: Use RAW GitHub CSV link
url = "https://raw.githubusercontent.com/makhan010385/DataBricks-/refs/heads/main/loan.csv"

# Step 2: Load CSV into Pandas DataFrame
pdf = pd.read_csv(url)

# Step 3: Convert Pandas → Spark DataFrame
df = spark.createDataFrame(pdf)

# Step 4: Show results
df.show(10)
df.printSchema()
df.count()


+-----------+---+------+-----------------+--------------+-----------+-------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+
|Customer_ID|Age|Gender|       Occupation|Marital Status|Family Size| Income|Expenditure|Use Frequency|Loan Category|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|
+-----------+---+------+-----------------+--------------+-----------+-------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+
|    IB14001| 30|  MALE|     BANK MANAGER|        SINGLE|          4|50000.0|    22199.0|            6|      HOUSING| 10,00,000 |      5|      42,898|               6|                 9|
|    IB14008| 44|  MALE|        PROFESSOR|       MARRIED|          6|51000.0|    19999.0|            4|     SHOPPING|     50,000|      3|      33,999|               1|                 5|
|    IB14012| 30|FEMALE|          DENTIST|        SINGLE|        

500

### **Part 2 Solution (Data Cleaning)**
Remove duplicate rows.
Handle missing values:
Drop rows with missing loan_status or loan_amnt.
Convert numeric columns to the correct data types (e.g., loan_amnt to integer).


In [50]:
df.dropDuplicates()
df = df.dropna(subset=["loan Amount"])
df = df.withColumn("loan Amount", df["loan Amount"].cast("integer"))
df.show(10)
df.printSchema()
df.count()

+-----------+---+------+-----------------+--------------+-----------+-------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+
|Customer_ID|Age|Gender|       Occupation|Marital Status|Family Size| Income|Expenditure|Use Frequency|Loan Category|loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|
+-----------+---+------+-----------------+--------------+-----------+-------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+
|    IB14001| 30|  MALE|     BANK MANAGER|        SINGLE|          4|50000.0|    22199.0|            6|      HOUSING|       NULL|      5|      42,898|               6|                 9|
|    IB14008| 44|  MALE|        PROFESSOR|       MARRIED|          6|51000.0|    19999.0|            4|     SHOPPING|       NULL|      3|      33,999|               1|                 5|
|    IB14012| 30|FEMALE|          DENTIST|        SINGLE|        

500

### **Part 3 Solution (Data Analysis)**
Find the average loan amount for each loan purpose.
Count how many loans were Fully Paid vs Charged Off.
Find the top 5 loan purposes by average funded amount.


In [51]:
df.groupBy("loan Category").agg({"loan Amount": "avg"}).show()
df.groupBy("loan Category").agg({"loan Amount": "avg"}).orderBy("avg(loan Amount)", ascending=False).show(5)

+------------------+----------------+
|     loan Category|avg(loan Amount)|
+------------------+----------------+
|           HOUSING|            NULL|
|        TRAVELLING|            NULL|
|       BOOK STORES|            NULL|
|         GOLD LOAN|            NULL|
|  EDUCATIONAL LOAN|            NULL|
|        AUTOMOBILE|            NULL|
|COMPUTER SOFTWARES|            NULL|
|           DINNING|            NULL|
|          SHOPPING|            NULL|
|       RESTAURANTS|            NULL|
|       ELECTRONICS|            NULL|
|          BUILDING|            NULL|
|   HOME APPLIANCES|            NULL|
|       AGRICULTURE|            NULL|
|          BUSINESS|            NULL|
|        RESTAURANT|            NULL|
+------------------+----------------+

+----------------+----------------+
|   loan Category|avg(loan Amount)|
+----------------+----------------+
|         HOUSING|            NULL|
|      TRAVELLING|            NULL|
|     BOOK STORES|            NULL|
|       GOLD LOAN|     

### **Part 4 Solution ( Spark SQL)**
Register the DataFrame as a temporary SQL view.
Using Spark SQL:
Get the loan purpose with the highest default rate.
Find the state with the most charged-off loans.

### **Part 5 Solution Machine Learning (Optional)**

Use StringIndexer to encode categorical columns like purpose and loan_status.
Use VectorAssembler to combine features.
Train a Logistic Regression model to predict loan_status (binary classification: Fully Paid vs Charged Off).
Evaluate model accuracy.
