In [22]:
#Importing the packages
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pyspark.sql.functions as F

In [3]:
#Creating the SparkSession
spark = SparkSession.builder.appName("FirstApp").getOrCreate()

In [4]:
#Defining schema for your DataFrame
myschema = StructType([\
                       StructField("userID", IntegerType(), True),
                       StructField("name", StringType(), True),
                       StructField("age",IntegerType(), True),
                       StructField("friends",IntegerType(), True),
                        ])

In [5]:
#Creating DataFrame on a CSV file
people = spark.read.format("csv")\
    .schema(myschema)\
    .option("path","fakefriends.csv")\
    .load()

In [6]:
#Performing all thetransformations
output = people.select(people.userID,people.name\
                       ,people.age,people.friends)\
         .where(people.age < 30).withColumn('insert_ts', F.current_timestamp())\
         .orderBy(people.userID)

In [7]:
#taking the count of o/p DataFrame
output.count()

112

In [8]:
#Creating a Temp View
output.createOrReplaceTempView("peoples")

In [11]:
#Running a simple Spark SQL query
df = spark.table("peoples")

In [12]:
df.show()

+------+--------+---+-------+--------------------+
|userID|    name|age|friends|           insert_ts|
+------+--------+---+-------+--------------------+
|     1|Jean-Luc| 26|      2|2023-12-21 19:35:...|
|     9|    Hugh| 27|    181|2023-12-21 19:35:...|
|    16|  Weyoun| 22|    323|2023-12-21 19:35:...|
|    21|   Miles| 19|    268|2023-12-21 19:35:...|
|    24|  Julian| 25|      1|2023-12-21 19:35:...|
|    25|     Ben| 21|    445|2023-12-21 19:35:...|
|    26|  Julian| 22|    100|2023-12-21 19:35:...|
|    32|     Nog| 26|    281|2023-12-21 19:35:...|
|    35| Beverly| 27|    305|2023-12-21 19:35:...|
|    46|    Morn| 25|     96|2023-12-21 19:35:...|
|    47|   Brunt| 24|     49|2023-12-21 19:35:...|
|    48|     Nog| 20|      1|2023-12-21 19:35:...|
|    52| Beverly| 19|    269|2023-12-21 19:35:...|
|    54|   Brunt| 19|      5|2023-12-21 19:35:...|
|    60|  Geordi| 20|    100|2023-12-21 19:35:...|
|    66|  Geordi| 21|    477|2023-12-21 19:35:...|
|    72|  Kasidy| 22|    179|20

In [51]:
df.groupBy("name")\
.agg(F.sum("friends").alias("friends_sum"), F.countDistinct("friends").alias("qty"))\
.orderBy(F.col("qty").desc())\
.show(5)

+-------+-----------+---+
|   name|friends_sum|qty|
+-------+-----------+---+
|Beverly|       2833|  8|
| Kasidy|       1538|  7|
|  Brunt|        961|  7|
|  Dukat|       1565|  6|
|    Nog|       1208|  6|
+-------+-----------+---+
only showing top 5 rows



In [59]:
spark.sql("create database learn_spark_db")
spark.sql("use learn_spark_db")

DataFrame[]

In [61]:
spark.sql("create table friends_list ")

AnalysisException: [NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT] CREATE Hive TABLE (AS SELECT) is not supported, if you want to enable it, please set "spark.sql.catalogImplementation" to "hive".;
'CreateTable `spark_catalog`.`learn_spark_db`.`friends_list`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, ErrorIfExists
+- Project [userID#0, name#1, age#2, friends#3, insert_ts#13]
   +- SubqueryAlias peoples
      +- View (`peoples`, [userID#0,name#1,age#2,friends#3,insert_ts#13])
         +- Sort [userID#0 ASC NULLS FIRST], true
            +- Project [userID#0, name#1, age#2, friends#3, current_timestamp() AS insert_ts#13]
               +- Filter (age#2 < 30)
                  +- Project [userID#0, name#1, age#2, friends#3]
                     +- Relation [userID#0,name#1,age#2,friends#3] csv


In [60]:
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/C:/Users/mesteves/Downloads/spark-warehouse'),
 Database(name='learn_spark_db', catalog='spark_catalog', description='', locationUri='file:/C:/Users/mesteves/Downloads/spark-warehouse/learn_spark_db.db')]