In [None]:
!pip install pyspark py4j



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BigDataApp").getOrCreate()

In [None]:
#read csv
df = spark.read.csv("/content/IMDB Dataset (1).csv", header=True, inferSchema=True)
#header=True tells Spark to use the first row as column names.

#inferSchema=True automatically detects data types.

In [None]:
df

DataFrame[review: string, sentiment: string]

In [None]:
# Reading JSON
#df = spark.read.json("data.json")

# Reading Parquet
#df = spark.read.parquet("data.parquet")

# Hive (if enabled)
#df = spark.sql("SELECT * FROM hive_table")

#**New** **Section**

3-inspecting and understanding data

In [None]:
df.show(5)         # Shows top 5 rows
df.printSchema()   # Prints the data schema
df.columns         # Returns list of column names


+--------------------+--------------------+
|              review|           sentiment|
+--------------------+--------------------+
|One of the other ...|            positive|
|"A wonderful litt...| not only is it w...|
|"I thought this w...| but spirited you...|
|Basically there's...|            negative|
|"Petter Mattei's ...| power and succes...|
+--------------------+--------------------+
only showing top 5 rows

root
 |-- review: string (nullable = true)
 |-- sentiment: string (nullable = true)



['review', 'sentiment']

In [None]:
df.show(n=df.count())

+--------------------+--------------------+
|              review|           sentiment|
+--------------------+--------------------+
|One of the other ...|            positive|
|"A wonderful litt...| not only is it w...|
|"I thought this w...| but spirited you...|
|Basically there's...|            negative|
|"Petter Mattei's ...| power and succes...|
|"Probably my all-...| but that only ma...|
|I sure would like...|            positive|
|This show was an ...|            negative|
|Encouraged by the...|            negative|
|If you like origi...|            positive|
|"Phil the Alien i...|            negative|
|I saw this movie ...|            negative|
|"So im not a big ...| meaning most of ...|
|The cast played S...|            negative|
|This a fantastic ...|            positive|
|Kind of drawn in ...|            negative|
|Some films just s...|            positive|
|This movie made i...|            negative|
|I remember this f...|            positive|
|An awful film! It...|          

In [None]:
df.describe().show()   # Summary stats (count, mean, stddev, min, max)


+-------+--------------------+--------------------+
|summary|              review|           sentiment|
+-------+--------------------+--------------------+
|  count|               49885|               49878|
|   mean|                NULL|              1392.0|
| stddev|                NULL|   959.7489255008312|
|    min|!!! Spoiler alert...| ! We are not stu...|
|    max|ý thýnk uzak ýs t...|you don't just ha...|
+-------+--------------------+--------------------+



4.Data filtering and selection

In [None]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("IMDB Analysis").getOrCreate()

# Load the CSV into a PySpark DataFrame
df = spark.read.csv("IMDB Dataset (1).csv", header=True, inferSchema=True)

# Show selected columns
df.select("review", "sentiment").show(5)

# Filter rows where sentiment is positive
df.filter(df["sentiment"] == "positive").show(5)

# Filter rows using SQL-style string expression
df.where("sentiment = 'negative'").show(5)


+--------------------+--------------------+
|              review|           sentiment|
+--------------------+--------------------+
|One of the other ...|            positive|
|"A wonderful litt...| not only is it w...|
|"I thought this w...| but spirited you...|
|Basically there's...|            negative|
|"Petter Mattei's ...| power and succes...|
+--------------------+--------------------+
only showing top 5 rows

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|One of the other ...| positive|
|I sure would like...| positive|
|If you like origi...| positive|
|This a fantastic ...| positive|
|Some films just s...| positive|
+--------------------+---------+
only showing top 5 rows

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|Basically there's...| negative|
|This show was an ...| negative|
|Encouraged by the...| negative|
|"Phil the Alien i...| negative|
|I saw this movie ...| neg

5- Group By and Aggregate

In [None]:
df.groupBy("sentiment").count().show()


+--------------------+-----+
|           sentiment|count|
+--------------------+-----+
| ""Nightmare"" is...|    1|
| he really kills ...|    1|
| while others wil...|    1|
| ""La Noche del T...|    1|
|"" which apparent...|    1|
|"" and felt a lit...|    1|
|"" I think you wi...|    1|
| a Spanish motion...|    1|
| which has turned...|    1|
| I suggest giving...|    1|
| they come up wit...|    1|
| one of the few h...|    1|
| while a guy who ...|    1|
| this is another ...|    1|
| Andres is snoozi...|    1|
|     you have chases|    1|
| or other caper/h...|    1|
| then this film i...|    1|
| Israel is one of...|    1|
| a bunch of lonel...|    1|
+--------------------+-----+
only showing top 20 rows



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, length

# Start Spark session
spark = SparkSession.builder.appName("IMDB Review Analysis").getOrCreate()

# Load dataset
df = spark.read.csv("IMDB Dataset (1).csv", header=True, inferSchema=True)

# Add a new column: review_length
df = df.withColumn("review_length", length("review"))

# Group by sentiment and calculate average review length
df.groupBy("sentiment").agg(avg("review_length").alias("avg_review_length")).show()


+--------------------+-----------------+
|           sentiment|avg_review_length|
+--------------------+-----------------+
| ""Nightmare"" is...|            100.0|
| he really kills ...|            380.0|
| while others wil...|             74.0|
| ""La Noche del T...|            407.0|
|"" which apparent...|            554.0|
|"" and felt a lit...|             80.0|
|"" I think you wi...|            296.0|
| a Spanish motion...|            406.0|
| which has turned...|             91.0|
| I suggest giving...|            514.0|
| they come up wit...|            481.0|
| one of the few h...|            419.0|
| while a guy who ...|           1005.0|
| this is another ...|            347.0|
| Andres is snoozi...|            497.0|
|     you have chases|            320.0|
| or other caper/h...|             51.0|
| then this film i...|           2072.0|
| Israel is one of...|            585.0|
| a bunch of lonel...|            348.0|
+--------------------+-----------------+
only showing top

6-Modifying Columns

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length

# Create Spark session
spark = SparkSession.builder.appName("IMDB Dataset Transformation").getOrCreate()

# Load dataset
df = spark.read.csv("IMDB Dataset (1).csv", header=True, inferSchema=True)

# Add a column for review length
df = df.withColumn("review_length", length(col("review")))

# Create a new column with 10% increased review length (just for demo purposes)
df = df.withColumn("increased_length", col("review_length") * 1.1)

# Show results
df.select("review", "sentiment", "review_length", "increased_length").show(5)


+--------------------+--------------------+-------------+------------------+
|              review|           sentiment|review_length|  increased_length|
+--------------------+--------------------+-------------+------------------+
|One of the other ...|            positive|         1761|1937.1000000000001|
|"A wonderful litt...| not only is it w...|          433|             476.3|
|"I thought this w...| but spirited you...|          724| 796.4000000000001|
|Basically there's...|            negative|          748| 822.8000000000001|
|"Petter Mattei's ...| power and succes...|          200|220.00000000000003|
+--------------------+--------------------+-------------+------------------+
only showing top 5 rows



In [None]:
df = df.withColumnRenamed("review", "text").withColumnRenamed("sentiment", "label")
df.show(5)


+--------------------+--------------------+-------------+------------------+
|                text|               label|review_length|  increased_length|
+--------------------+--------------------+-------------+------------------+
|One of the other ...|            positive|         1761|1937.1000000000001|
|"A wonderful litt...| not only is it w...|          433|             476.3|
|"I thought this w...| but spirited you...|          724| 796.4000000000001|
|Basically there's...|            negative|          748| 822.8000000000001|
|"Petter Mattei's ...| power and succes...|          200|220.00000000000003|
+--------------------+--------------------+-------------+------------------+
only showing top 5 rows



In [None]:
df = df.drop("review")
df.show(5)


+--------------------+--------------------+-------------+------------------+
|                text|               label|review_length|  increased_length|
+--------------------+--------------------+-------------+------------------+
|One of the other ...|            positive|         1761|1937.1000000000001|
|"A wonderful litt...| not only is it w...|          433|             476.3|
|"I thought this w...| but spirited you...|          724| 796.4000000000001|
|Basically there's...|            negative|          748| 822.8000000000001|
|"Petter Mattei's ...| power and succes...|          200|220.00000000000003|
+--------------------+--------------------+-------------+------------------+
only showing top 5 rows



In [None]:
print(df.columns)


['text', 'label', 'review_length', 'increased_length']


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id

# Start Spark session
spark = SparkSession.builder.appName("IMDB Join Example").getOrCreate()

# Load dataset
df1 = spark.read.csv("IMDB Dataset (1).csv", header=True, inferSchema=True)

# Add a synthetic ID column to both dataframes
df1 = df1.withColumn("id", monotonically_increasing_id())
df2 = df1.withColumnRenamed("review", "review_copy").withColumnRenamed("sentiment", "sentiment_copy")

# Perform inner join on 'id'
joined_df = df1.join(df2, df1.id == df2.id, "inner")

# Show some joined rows
joined_df.select("review", "review_copy", "sentiment", "sentiment_copy").show(5)


+--------------------+--------------------+--------------------+--------------------+
|              review|         review_copy|           sentiment|      sentiment_copy|
+--------------------+--------------------+--------------------+--------------------+
|One of the other ...|One of the other ...|            positive|            positive|
|"A wonderful litt...|"A wonderful litt...| not only is it w...| not only is it w...|
|"Probably my all-...|"Probably my all-...| but that only ma...| but that only ma...|
|I sure would like...|I sure would like...|            positive|            positive|
|This show was an ...|This show was an ...|            negative|            negative|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [None]:
# Save DataFrame to CSV
df.write.mode("overwrite").csv("output/imdb_csv", header=True)

# Save DataFrame to Parquet
df.write.mode("overwrite").parquet("output/imdb_parquet")


In [None]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.appName("IMDB SQL Query").getOrCreate()

# Load dataset
df = spark.read.csv("IMDB Dataset (1).csv", header=True, inferSchema=True)

# Register the DataFrame as a temporary SQL table
df.createOrReplaceTempView("imdb")

# Run SQL query: count number of positive and negative reviews
result = spark.sql("SELECT sentiment, COUNT(*) as total FROM imdb GROUP BY sentiment")
result.show()


+--------------------+-----+
|           sentiment|total|
+--------------------+-----+
| ""Nightmare"" is...|    1|
| he really kills ...|    1|
| while others wil...|    1|
| ""La Noche del T...|    1|
|"" which apparent...|    1|
|"" and felt a lit...|    1|
|"" I think you wi...|    1|
| a Spanish motion...|    1|
| which has turned...|    1|
| I suggest giving...|    1|
| they come up wit...|    1|
| one of the few h...|    1|
| while a guy who ...|    1|
| this is another ...|    1|
| Andres is snoozi...|    1|
|     you have chases|    1|
| or other caper/h...|    1|
| then this film i...|    1|
| Israel is one of...|    1|
| a bunch of lonel...|    1|
+--------------------+-----+
only showing top 20 rows



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("DataFrame vs RDD").getOrCreate()

# Create DataFrame from a list of numbers
df = spark.createDataFrame([(1,), (2,), (3,), (4,)], ["number"])

# Add a new column with squared values
df = df.withColumn("squared", col("number") * col("number"))

df.show()


+------+-------+
|number|squared|
+------+-------+
|     1|      1|
|     2|      4|
|     3|      9|
|     4|     16|
+------+-------+

