
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/Nilesh_Introduction__1_.pdf"
file_type = "pdf"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = (
    spark.read.format(file_type)
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load(file_location)
)

display(df)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-2522491330981311>:12[0m
[1;32m      8[0m delimiter [38;5;241m=[39m [38;5;124m"[39m[38;5;124m,[39m[38;5;124m"[39m
[1;32m     10[0m [38;5;66;03m# The applied options are for CSV files. For other file types, these will be ignored.[39;00m
[1;32m     11[0m df [38;5;241m=[39m (
[0;32m---> 12[0m     spark[38;5;241m.[39mread[38;5;241m.[39mformat(file_type)
[1;32m     13[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferSchema[39m[38;5;124m"[39m, infer_schema)
[1;32m     14[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m, first_row_is_header)
[1;32m     15[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124msep[39m[38;5;124m"[39m, delimiter)
[1;32m     16[0m     [38;5;241m.[39mload(file_location)
[1;32

In [0]:
# Create a view or table

temp_table_name = "Nilesh_Introduction__1__pdf"

df.createOrReplaceTempView(temp_table_name)



In [0]:
%sql
/* Query the created temp table in a SQL cell */
select
  *
from
  `Nilesh_Introduction__1__pdf`



In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "Nilesh_Introduction__1__pdf"

# df.write.format("parquet").saveAsTable(permanent_table_name)



In [0]:
%fs ls '/FileStore/tables/'



In [0]:
rdd1 = sc.textFile("/FileStore/tables/new.txt/")



In [0]:
rdd1.collect()



In [0]:
rdd2 = rdd1.flatMap(lambda line: line.split(" "))



In [0]:
rdd2.collect()



In [0]:
rdd3 = rdd2.map(lambda word: (word, 1))



In [0]:
rdd3.collect()



In [0]:
rdd1.getNumPartitions()



In [0]:
rdd2.getNumPartitions()



In [0]:
rdd4 = rdd3.reduceByKey(lambda x, y: x + y)



In [0]:
rdd4.collect()



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, lower

# Initialize Spark Session
spark = SparkSession.builder.appName("WordCount").getOrCreate()

# Input sentence
sentence = "spark is cool spark is fast spark is a great i love spark"

# Create DataFrame
df = spark.createDataFrame([(sentence,)], ["sentence"])

# Split sentence into words, explode into rows, and convert to lowercase
words_df = df.select(explode(split(col("sentence"), " ")).alias("word"))

# Filter for the word "spark" and count occurrences
count = words_df.filter(lower(col("word")) == "spark").count()

print(f"Count of the word 'spark': {count}")



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, lower, count

# Initialize Spark Session
spark = SparkSession.builder.appName("WordCount").getOrCreate()

# Input sentence
sentence = "spark is cool spark is fast spark is a great i love spark"

# Create DataFrame
df = spark.createDataFrame([(sentence,)], ["sentence"])

# Split sentence into words, explode into rows, and convert to lowercase
words_df = df.select(explode(split(col("sentence"), " ")).alias("word"))

# Group by words and count occurrences
word_counts = words_df.groupBy(lower(col("word")).alias("word")).count()

# Show the word counts
word_counts.show()



In [0]:
data = [("Yogesh", 29), ("Saurabh", 19)]
column = ["Name", "age"]

df = spark.createDataFrame(data, column)
df.display()



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("data1").getOrCreate()

data = [("A", 60, 2020), ("B", 70, 2020), ("C", 80, 2020)]

columns = ["course", "Percentile", "Year"]

df = spark.createDataFrame(data, schema=columns)

# df.show()

# we want percentile greater than 60 ok

greater_df = df.filter(col("Percentile") > 60)

greater_df.display()



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, dense_rank
from pyspark.sql.window import Window


spark = SparkSession.builder.appName("SecondHighest").getOrCreate()

data = [("A", 20000, 30), ("b", 40000, 30), ("c", 60000, 30)]

df = spark.createDataFrame(data, ["Name", "Salary", "age"])
df.show()

withWindowspec = Window.orderBy(col("Salary").desc())

df_withrank = df.withColumn("rank", dense_rank().over(withWindowspec))


df_withrank.display()

df_second = df_withrank.filter(col("rank") == 2).show()



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

# Initialize Spark Session
spark = SparkSession.builder.appName("PivotExample").getOrCreate()

# Sample Data
data = [("A", 2022, 100), ("A", 2023, 150), ("B", 2022, 200), ("B", 2023, 250)]

# Create DataFrame
columns = ["Product", "Year", "Sales"]
df = spark.createDataFrame(data, columns)

# Show the original DataFrame
print("Original DataFrame:")
df.show()

# Pivot the DataFrame (Converting rows into columns)
pivot_df = df.groupBy("Product").pivot("Year").agg(sum("Sales"))

# Show the pivoted DataFrame (rows converted into columns)
print("Pivoted DataFrame:")
pivot_df.show()



# implemndation of when condition 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create a Spark session
spark = SparkSession.builder.appName("WhenExample").getOrCreate()

# Sample data
data = [(1, 10), (2, 25), (3, 70), (4, 15), (5, 45)]

schema = """
          Id INT,
          age INT
"""
df10 = spark.createDataFrame(data, schema)
df10.show()

df11 = df10.withColumn(
    "Criteria",
    when(col("age") < 18, "Child")
    .when((col("age") >= 18) & (col("age") <= 60), "Adult")
    .otherwise("senior"),
)

df11.show()

+---+---+
| Id|age|
+---+---+
|  1| 10|
|  2| 25|
|  3| 70|
|  4| 15|
|  5| 45|
+---+---+

+---+---+--------+
| Id|age|Criteria|
+---+---+--------+
|  1| 10|   Child|
|  2| 25|   Adult|
|  3| 70|  senior|
|  4| 15|   Child|
|  5| 45|   Adult|
+---+---+--------+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('checkGender').getOrCreate()

data = [
    ("Alice", 25, "F", 3000),
    ("Bob", 30, "M", 4000),
    ("Cathy", 27, "F", 5000),
    ("David", 35, "M", 4500),
    ("Eva", 29, "F", None)
]

column = """
         Name STRING,
         Age INT,
         Gender STRING,
         Salary INT
"""

df_data =spark.createDataFrame(data, column)
print("from the dataframe")
df_data.show()

# df_female = df_data.filter(col('Gender') == 'F')
# df_female.show()

df_withBonus = df_data.withColumn('Bonus', when(col('Salary').isNotNull(),col('Salary')*.10).otherwise(0))
df_withBonus.show()
df_withBonus_with_filter = df_withBonus.filter(col('Age') > 28)
df_withBonus_with_filter.show()

df_withBonus_with_filter_grade = df_withBonus.withColumn('Grade',expr("CASE WHEN age >= 30 THEN 'Senior' ELSE 'Junior' END"))
df_withBonus_with_filter_grade.display()

df_withBonus_with_filter_grade_agg = df_withBonus_with_filter_grade.groupBy('Gender').agg(expr("avg(Age)").alias('Average_age'),expr("SUM(Salary)").alias('Total_salary'))
df_withBonus_with_filter_grade_agg.display()

from the dataframe
+-----+---+------+------+
| Name|Age|Gender|Salary|
+-----+---+------+------+
|Alice| 25|     F|  3000|
|  Bob| 30|     M|  4000|
|Cathy| 27|     F|  5000|
|David| 35|     M|  4500|
|  Eva| 29|     F|  null|
+-----+---+------+------+

+-----+---+------+------+-----+
| Name|Age|Gender|Salary|Bonus|
+-----+---+------+------+-----+
|Alice| 25|     F|  3000|300.0|
|  Bob| 30|     M|  4000|400.0|
|Cathy| 27|     F|  5000|500.0|
|David| 35|     M|  4500|450.0|
|  Eva| 29|     F|  null|  0.0|
+-----+---+------+------+-----+

+-----+---+------+------+-----+
| Name|Age|Gender|Salary|Bonus|
+-----+---+------+------+-----+
|  Bob| 30|     M|  4000|400.0|
|David| 35|     M|  4500|450.0|
|  Eva| 29|     F|  null|  0.0|
+-----+---+------+------+-----+



Name,Age,Gender,Salary,Bonus,Grade
Alice,25,F,3000.0,300.0,Junior
Bob,30,M,4000.0,400.0,Senior
Cathy,27,F,5000.0,500.0,Junior
David,35,M,4500.0,450.0,Senior
Eva,29,F,,0.0,Junior


Gender,Average_age,Total_salary
F,27.0,8000
M,32.5,8500


# 