In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.master("local[*]").appName("SparkPractice").getOrCreate()

In [5]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [7]:
employee_df = spark.read.format("csv")\
.option("header","true")\
.option("inferschema","true")\
.option("mode","permissive")\
.load(r"C:\Users\jenil.makhansa\dataset.csv")
employee_df.show()

+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        NULL|nominee5|
+---+--------+---+------+------------+--------+



In [7]:
from pyspark.sql.types import StringType,StructField,StructType,IntegerType
emp_schema = StructType(
    [
        StructField("id",IntegerType(),True),
        StructField("name",StringType(),True),
        StructField("age",IntegerType(),True),
        StructField("salary",IntegerType(),True),
        StructField("address",StringType(),True),
        StructField("nominee",StringType(),True),
        StructField("_corrupt_record",StringType(),True)
    ])

In [8]:
employee_df = spark.read.format("csv")\
.option("header","true")\
.option("inferschema","true")\
.schema(emp_schema)\
.option("badRecordsPath",r"C:\Users\jenil.makhansa\badrecords")\
.load(r"C:\Users\jenil.makhansa\dataset.csv")
employee_df.show()

+---+--------+---+------+------------+--------+--------------------+
| id|    name|age|salary|     address| nominee|     _corrupt_record|
+---+--------+---+------+------------+--------+--------------------+
|  1|  Manish| 26| 75000|       bihar|nominee1|                NULL|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|                NULL|
|  3|  Pritam| 22|150000|   Bangalore|   India|3,Pritam,22,15000...|
|  4|Prantosh| 17|200000|     Kolkata|   India|4,Prantosh,17,200...|
|  5|  Vikash| 31|300000|        NULL|nominee5|                NULL|
+---+--------+---+------+------------+--------+--------------------+



In [35]:
line_delimited_json = spark.read.format("json")\
.option("inferSchema","true")\
.option("mode","PERMISSIVE")\
.load(r"C:\Users\jenil.makhansa\line_delimited_json.json")
line_delimited_json.show()

+---+--------+------+
|age|    name|salary|
+---+--------+------+
| 20|  Manish| 20000|
| 25|  Nikita| 21000|
| 16|  Pritam| 22000|
| 35|Prantosh| 25000|
| 67|  Vikash| 40000|
+---+--------+------+



In [38]:
df = spark.read.parquet(r"C:\Users\jenil.makhansa\sample-parquet.gz.parquet")
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [39]:
import pyarrow as pa
import pyarrow.parquet as pq

parquet_file = pq.ParquetFile(r'C:\Users\nikita\Downloads\Spark-The-Definitive-Guide-master\data\flight-data\parquet\2010-summary.parquet\part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet')
parquet_file.metadata
parquet_file.metadata.row_group(0) 
parquet_file.metadata.row_group(0).column(0)
parquet_file.metadata.row_group(0).column(0).statistics 

ModuleNotFoundError: No module named 'pyarrow'

In [None]:
command1 : parquet-tools show path 
command2 : parquet-tools inspect path

In [65]:
employee_df.write.format("csv")\
.option("header","true")\
.mode("errorifexists")\
.option("path",r"C:\Users\jenil.makhansa\Export_data")\
.save()

In [66]:
employee_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- nominee: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)



In [67]:
employee_df.columns

['id', 'name', 'age', 'salary', 'address', 'nominee', '_corrupt_record']

In [68]:
employee_df.select("name").show()

+--------+
|    name|
+--------+
|  Manish|
|  Nikita|
|  Pritam|
|Prantosh|
|  Vikash|
+--------+



In [71]:
employee_df.select(col("name")).show()

+--------+
|    name|
+--------+
|  Manish|
|  Nikita|
|  Pritam|
|Prantosh|
|  Vikash|
+--------+



In [None]:
# This will not work.
employee_df.select("id+5").show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `id+5` cannot be resolved. Did you mean one of the following? [`id`, `age`, `name`, `address`, `nominee`].;
'Project ['id+5]
+- Relation [id#542,name#543,age#544,salary#545,address#546,nominee#547,_corrupt_record#548] csv


In [74]:
employee_df.select(expr("id+5")).show()

+--------+
|(id + 5)|
+--------+
|       6|
|       7|
|       8|
|       9|
|      10|
+--------+



In [76]:
employee_df.select(col("id")+5).show()

+--------+
|(id + 5)|
+--------+
|       6|
|       7|
|       8|
|       9|
|      10|
+--------+



In [79]:
employee_df.select("id",col("name"),employee_df["salary"],employee_df.address).show()

+---+--------+------+------------+
| id|    name|salary|     address|
+---+--------+------+------------+
|  1|  Manish| 75000|       bihar|
|  2|  Nikita|100000|uttarpradesh|
|  3|  Pritam|150000|   Bangalore|
|  4|Prantosh|200000|     Kolkata|
|  5|  Vikash|300000|        NULL|
+---+--------+------+------------+



In [81]:
employee_df.select(expr("id as employee_id"),expr("concat(name,address)")).show()

+-----------+---------------------+
|employee_id|concat(name, address)|
+-----------+---------------------+
|          1|          Manishbihar|
|          2|   Nikitauttarpradesh|
|          3|      PritamBangalore|
|          4|      PrantoshKolkata|
|          5|                 NULL|
+-----------+---------------------+



In [9]:
employee_df.createOrReplaceTempView("employee_table")

In [10]:
spark.sql("""select * from employee_table""").show()

+---+--------+---+------+------------+--------+--------------------+
| id|    name|age|salary|     address| nominee|     _corrupt_record|
+---+--------+---+------+------------+--------+--------------------+
|  1|  Manish| 26| 75000|       bihar|nominee1|                NULL|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|                NULL|
|  3|  Pritam| 22|150000|   Bangalore|   India|3,Pritam,22,15000...|
|  4|Prantosh| 17|200000|     Kolkata|   India|4,Prantosh,17,200...|
|  5|  Vikash| 31|300000|        NULL|nominee5|                NULL|
+---+--------+---+------+------------+--------+--------------------+



In [11]:
employee_df.select(col("id").alias("employee_id"),"name","age").show()

+-----------+--------+---+
|employee_id|    name|age|
+-----------+--------+---+
|          1|  Manish| 26|
|          2|  Nikita| 23|
|          3|  Pritam| 22|
|          4|Prantosh| 17|
|          5|  Vikash| 31|
+-----------+--------+---+



In [12]:
employee_df.filter(col("salary")>150000).show()

+---+--------+---+------+-------+--------+--------------------+
| id|    name|age|salary|address| nominee|     _corrupt_record|
+---+--------+---+------+-------+--------+--------------------+
|  4|Prantosh| 17|200000|Kolkata|   India|4,Prantosh,17,200...|
|  5|  Vikash| 31|300000|   NULL|nominee5|                NULL|
+---+--------+---+------+-------+--------+--------------------+



In [13]:
employee_df.where(col("salary")>150000).show()

+---+--------+---+------+-------+--------+--------------------+
| id|    name|age|salary|address| nominee|     _corrupt_record|
+---+--------+---+------+-------+--------+--------------------+
|  4|Prantosh| 17|200000|Kolkata|   India|4,Prantosh,17,200...|
|  5|  Vikash| 31|300000|   NULL|nominee5|                NULL|
+---+--------+---+------+-------+--------+--------------------+



In [14]:
employee_df.filter((col("salary")>150000) & (col("age")<18)).show()

+---+--------+---+------+-------+-------+--------------------+
| id|    name|age|salary|address|nominee|     _corrupt_record|
+---+--------+---+------+-------+-------+--------------------+
|  4|Prantosh| 17|200000|Kolkata|  India|4,Prantosh,17,200...|
+---+--------+---+------+-------+-------+--------------------+



In [15]:
employee_df.select("*",lit("kumar").alias("last_name")).show()

+---+--------+---+------+------------+--------+--------------------+---------+
| id|    name|age|salary|     address| nominee|     _corrupt_record|last_name|
+---+--------+---+------+------------+--------+--------------------+---------+
|  1|  Manish| 26| 75000|       bihar|nominee1|                NULL|    kumar|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|                NULL|    kumar|
|  3|  Pritam| 22|150000|   Bangalore|   India|3,Pritam,22,15000...|    kumar|
|  4|Prantosh| 17|200000|     Kolkata|   India|4,Prantosh,17,200...|    kumar|
|  5|  Vikash| 31|300000|        NULL|nominee5|                NULL|    kumar|
+---+--------+---+------+------------+--------+--------------------+---------+



In [16]:
employee_df.withColumn("surname",lit("singh")).show()

+---+--------+---+------+------------+--------+--------------------+-------+
| id|    name|age|salary|     address| nominee|     _corrupt_record|surname|
+---+--------+---+------+------------+--------+--------------------+-------+
|  1|  Manish| 26| 75000|       bihar|nominee1|                NULL|  singh|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|                NULL|  singh|
|  3|  Pritam| 22|150000|   Bangalore|   India|3,Pritam,22,15000...|  singh|
|  4|Prantosh| 17|200000|     Kolkata|   India|4,Prantosh,17,200...|  singh|
|  5|  Vikash| 31|300000|        NULL|nominee5|                NULL|  singh|
+---+--------+---+------+------------+--------+--------------------+-------+



In [17]:
employee_df.withColumnRenamed("id","employee_id").show()

+-----------+--------+---+------+------------+--------+--------------------+
|employee_id|    name|age|salary|     address| nominee|     _corrupt_record|
+-----------+--------+---+------+------------+--------+--------------------+
|          1|  Manish| 26| 75000|       bihar|nominee1|                NULL|
|          2|  Nikita| 23|100000|uttarpradesh|nominee2|                NULL|
|          3|  Pritam| 22|150000|   Bangalore|   India|3,Pritam,22,15000...|
|          4|Prantosh| 17|200000|     Kolkata|   India|4,Prantosh,17,200...|
|          5|  Vikash| 31|300000|        NULL|nominee5|                NULL|
+-----------+--------+---+------+------------+--------+--------------------+



In [18]:
employee_df.withColumn("id",col("id").cast("string")).printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- nominee: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)



In [19]:
employee_df.drop("id",col("name")).show()

+---+------+------------+--------+---------------+
|age|salary|     address| nominee|_corrupt_record|
+---+------+------------+--------+---------------+
| 26| 75000|       bihar|nominee1|           NULL|
| 23|100000|uttarpradesh|nominee2|           NULL|
| 22|150000|   Bangalore|   India|           NULL|
| 17|200000|     Kolkata|   India|           NULL|
| 31|300000|        NULL|nominee5|           NULL|
+---+------+------------+--------+---------------+



In [21]:
spark.sql("""
select * from employee_table where salary > 150000 and age<18
""").show()

+---+--------+---+------+-------+-------+--------------------+
| id|    name|age|salary|address|nominee|     _corrupt_record|
+---+--------+---+------+-------+-------+--------------------+
|  4|Prantosh| 17|200000|Kolkata|  India|4,Prantosh,17,200...|
+---+--------+---+------+-------+-------+--------------------+



In [22]:
spark.sql("""
select *,"kumar" as last_name from employee_table where salary > 150000 and age<18
""").show()

+---+--------+---+------+-------+-------+--------------------+---------+
| id|    name|age|salary|address|nominee|     _corrupt_record|last_name|
+---+--------+---+------+-------+-------+--------------------+---------+
|  4|Prantosh| 17|200000|Kolkata|  India|4,Prantosh,17,200...|    kumar|
+---+--------+---+------+-------+-------+--------------------+---------+



In [23]:
spark.sql("""
select *,"kumar" as last_name,concat(name,last_name) as full_name from employee_table where salary > 150000 and age<18
""").show()

+---+--------+---+------+-------+-------+--------------------+---------+-------------+
| id|    name|age|salary|address|nominee|     _corrupt_record|last_name|    full_name|
+---+--------+---+------+-------+-------+--------------------+---------+-------------+
|  4|Prantosh| 17|200000|Kolkata|  India|4,Prantosh,17,200...|    kumar|Prantoshkumar|
+---+--------+---+------+-------+-------+--------------------+---------+-------------+



In [24]:
spark.sql("""
select *,"kumar" as last_name,concat(name,last_name) as full_name,cast(id as string) from employee_table where salary > 150000 and age<18
""").show()

+---+--------+---+------+-------+-------+--------------------+---------+-------------+---+
| id|    name|age|salary|address|nominee|     _corrupt_record|last_name|    full_name| id|
+---+--------+---+------+-------+-------+--------------------+---------+-------------+---+
|  4|Prantosh| 17|200000|Kolkata|  India|4,Prantosh,17,200...|    kumar|Prantoshkumar|  4|
+---+--------+---+------+-------+-------+--------------------+---------+-------------+---+

