In [5]:
import sys
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.master("local[1]").appName("spark-app-version-x").getOrCreate()

In [7]:
local_file = r'C:\Users\Yunis\Desktop\yellow_tripdata_2024-01.parquet'
df=spark.read.parquet(local_file)
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [8]:
df.select('VendorID','tpep_pickup_datetime','total_amount').where('total_amount>1').show(n=10)

+--------+--------------------+------------+
|VendorID|tpep_pickup_datetime|total_amount|
+--------+--------------------+------------+
|       2| 2024-01-01 00:57:55|        22.7|
|       1| 2024-01-01 00:03:00|       18.75|
|       1| 2024-01-01 00:17:06|        31.3|
|       1| 2024-01-01 00:36:38|        17.0|
|       1| 2024-01-01 00:46:51|        16.1|
|       1| 2024-01-01 00:54:08|        41.5|
|       2| 2024-01-01 00:49:44|       64.95|
|       1| 2024-01-01 00:30:40|        30.4|
|       2| 2024-01-01 00:26:01|        36.0|
|       2| 2024-01-01 00:28:08|         8.0|
+--------+--------------------+------------+
only showing top 10 rows



In [9]:
df.createOrReplaceTempView('yun')

In [10]:
spark.sql("select * from yun where total_amount>3").show(n=5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|              1|         1.72|         1|                 N|         186|          79|           2|       17.7|  1.0|    0.5|       0.

In [11]:
spark.stop()

In [12]:
import logging
from pyspark.sql import SparkSession

In [13]:
def rdd_to_dataframe(data, schema):
    """
    Example: This fn creates a Spark RDD, loads it into a Spark DataFrame, and returns the DataFrame 
    """
        
    # Create a SparkSession
    spark = SparkSession.builder.appName("RDDToDataFrame").getOrCreate()

    try:
        # Create an RDD from the input data, using Spark Context not Session!
        rdd = spark.sparkContext.parallelize(data)

        # Convert RDD to DataFrame
        df = spark.createDataFrame(rdd, schema)

        # Return the DataFrame, without stopping the SparkSession
        return df

    except Exception as e:
        # Log error and Stop the SparkSession
        logging.error('Error while transforming RDD to DF: {}'.format(e))
        spark.stop()


In [14]:
dept_data = [(1,"Big Data"), (2, "Finance"), (3,"Marketing")]
dept_schema = ["department_id", "department_name"]

In [15]:
emp_data = [(1,"Carlos", 17), (1,"Bob", 30), (2,"Jasmin", 26)]
emp_schema = ["department_id","employee_name", "age"]

In [16]:
df_emp = rdd_to_dataframe(emp_data, emp_schema)
df_dept = rdd_to_dataframe(dept_data, dept_schema)

In [17]:
df_dept.show()

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|            1|       Big Data|
|            2|        Finance|
|            3|      Marketing|
+-------------+---------------+



In [18]:
spark=SparkSession.builder.appName("RDDToDataFrame").getOrCreate()

In [19]:
df_emp.createOrReplaceTempView('employees')
df_dept.createOrReplaceTempView('departments')

In [20]:
spark.sql('''select emp.*,dp.* from employees as emp join departments as dp on(emp.department_id=dp.department_id) where age>=18''').show()

+-------------+-------------+---+-------------+---------------+
|department_id|employee_name|age|department_id|department_name|
+-------------+-------------+---+-------------+---------------+
|            1|          Bob| 30|            1|       Big Data|
|            2|       Jasmin| 26|            2|        Finance|
+-------------+-------------+---+-------------+---------------+



In [10]:
spark.sql('''select emp.employee_name,emp.age,emp.department_id,dp.department_name from employees as emp join departments as dp on (emp.department_id=dp.department_id)''').createOrReplaceTempView('dp_employees')

In [11]:
spark.sql('''select * from dp_employees''').show()

+-------------+---+-------------+---------------+
|employee_name|age|department_id|department_name|
+-------------+---+-------------+---------------+
|       Carlos| 17|            1|       Big Data|
|          Bob| 30|            1|       Big Data|
|       Jasmin| 26|            2|        Finance|
+-------------+---+-------------+---------------+



In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sqlfn
from pyspark.sql import types as sqlt

In [2]:
spark = SparkSession.builder.appName("RDDToDataFrame").getOrCreate()

In [3]:
spark = SparkSession.builder.appName('SparkStreamingLab1').master('local[2]').getOrCreate()

In [4]:
df_budgets = spark.read.option("multiline","true").json('C:\Windows\TEMP\Rar$DIa3316.41882\department_budgets.json')

In [5]:
df_budgets.printSchema()

root
 |-- budget: long (nullable = true)
 |-- budget_authorizer: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cto: struct (nullable = true)
 |    |    |    |-- last_name: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- office: string (nullable = true)
 |-- budget_period: string (nullable = true)
 |-- department_id: long (nullable = true)
 |-- offices: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cost_center: struct (nullable = true)
 |    |    |    |-- budget_status: string (nullable = true)
 |    |    |    |-- office: string (nullable = true)



In [26]:
df_budgets.select('offices').where('department_id == 1').show(truncate=False)

+-------------------------------------------------------------------------+
|offices                                                                  |
+-------------------------------------------------------------------------+
|[{{denied, new york}}, {{approved, mumbai}}, {{approved, san francisco}}]|
+-------------------------------------------------------------------------+



In [29]:
df_budgets.createOrReplaceTempView('budgets_json')

In [None]:
spark.sql('''
          select  emp.employee_name, 
                  emp.department_id, 
                  bud.budget, 
                  bud.budget_period, 
                  bud.offices[0].cost_center.office as office_1,
                  bud.offices[0].cost_center.budget_status as budget_status_1,
                  bud.offices[1].cost_center.office as office_2,
                  bud.offices[1].cost_center.budget_status as budget_status_2,
                  bud.offices[2].cost_center.office as office_3,
                  bud.offices[2].cost_center.budget_status as budget_status_3,
                  nvl(bud.budget_authorizer[0].cto.name,"no CTO registered")  as cto_name,
                  nvl(bud.budget_authorizer[0].cto.last_name,"no CTO registered") as cto_last_name
          from dept_employees as emp
            inner join budgets_json as bud on (emp.department_id = bud.department_id)
          ''').show()

In [31]:
schema_employee = sqlt.StructType([
    sqlt.StructField('employee_id',sqlt.IntegerType(), True),
    sqlt.StructField('department_name',sqlt.StringType(), True),
    sqlt.StructField('name',sqlt.StringType(), True),
    sqlt.StructField('last_name',sqlt.StringType(), True),
    sqlt.StructField('hire_timestamp',sqlt.TimestampType(), True)
])

In [None]:
df_employees = spark.readStream.format('csv').schema(schema_employee)\
                    .option('header',True)\
                    .option('maxFilesPerTrigger',1)\
                    .load(r'datasets/csv/')

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.master('local[1]').appName('spark-version-x').getOrCreate()

In [4]:
input_path=r'C:\Users\Yunis\production_data\yellow_taxi\yellow_tripdata_2024-01.parquet'
output_path=r'C:\Users\Yunis\production_data\blue_taxi'


In [5]:
df=spark.read.parquet(input_path)

In [6]:
df.createOrReplaceTempView('yellow_taxi')

In [7]:
df_output=spark.sql('''select VendorID,tpep_pickup_datetime,passenger_count from yellow_taxi where total_amount>1''')

In [11]:
df_output.write.mode('overwrite').parquet(output_path)

Py4JJavaError: An error occurred while calling o43.parquet.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)
