In [5]:
# Apache Spark의 api를 사용하여 데이터프레임을 생성 가능함
# SparkSession을 통해 기능에 접근하여 데이터 프레임을 만듬

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

spark = (SparkSession
         .builder
         .appName('AuthorsAges')
         .getOrCreate())

data_df = spark.createDataFrame([('Brooke', 20), ('Denny', 31), ('Jules', 30),
                                 ('TD', 35), ('Brooke', 25)], ['name', 'age'])

avg_df = data_df.groupBy('name').agg(avg('age'))
avg_df.show()

24/10/30 11:08:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+------+--------+
|  name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Denny|    31.0|
| Jules|    30.0|
|    TD|    35.0|
+------+--------+



                                                                                

In [16]:
# 또한, 스키마 정의를 통해 DataFrame의 구조를 명시할 수 있음
# 스키마는 각 열의 이름과 데이터 타입을 포함

from pyspark.sql.types import *

schema = StructType([StructField('author', StringType(), False),
                     StructField('title', StringType(), False),
                     StructField('gages', IntegerType(), False)])

In [15]:
from pyspark.sql import SparkSession

# DDL을 사용하여 스키마를 정의
schema = "Id INT, First STRING, Last STRING, Url STRING, Published STRING, Hits INT, Campaigns ARRAY<STRING>"

# 기본 데이터 생성
data = [
    [1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
    [2, "Brooke", "Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
    [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web", "twitter", "FB", "LinkedIn"]],
    [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
    [5, "Matei", "Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
    [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
]

# 메인 프로그램
if __name__ == "__main__":
    spark = SparkSession.builder.appName("Example-3_6").getOrCreate()
    blogs_df = spark.createDataFrame(data, schema)
    blogs_df.show()
    blogs_df.printSchema() # -> 스키마 출력

24/10/30 11:13:50 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (

In [8]:
# Row 객체를 사용하여 개별 데이터를 표현
# Row는 Spark DataFrame의 한 행을 나타냄

from pyspark.sql import Row

blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"])
blog_row[1]

'Reynold'

In [9]:
# Row 객체를 사용하여 구성된 리스트를 통해 DataFrame을 생성할 수 있음
# 각 Row는 열 데이터를 포함하며, 이 리스트를 Spark의 createDataFrame 메서드에 전달함

rows = [Row('Matei Zaharia', 'CA'), Row('Reynold Xin', 'CA')]
authors_df = spark.createDataFrame(rows, ['Authors', 'State'])
authors_df.show()

+-------------+-----+
|      Authors|State|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+



In [10]:
# 프로그래밍적으로 스키마를 정의하여 DataFrame을 생성
# 스키마 정의는 데이터의 구조를 명시하여 데이터 처리를 용이하게 함

from pyspark.sql.types import *  
from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder.appName("Fire Calls").getOrCreate()

# 스키마 정의
fire_schema = StructType([
    StructField('CallNumber', IntegerType(), True),
    StructField('UnitID', StringType(), True),
    StructField('IncidentNumber', IntegerType(), True),
    StructField('CallType', StringType(), True),
    StructField('CallDate', StringType(), True),
    StructField('WatchDate', StringType(), True),
    StructField('CallFinalDisposition', StringType(), True),
    StructField('AvailableDtTm', StringType(), True),
    StructField('Address', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Zipcode', IntegerType(), True),
    StructField('Battalion', StringType(), True),
    StructField('StationArea', StringType(), True),
    StructField('Priority', StringType(), True),
    StructField('FinalPriority', IntegerType(), True),
    StructField('ALSUnit', BooleanType(), True),
    StructField('CallTypeGroup', StringType(), True),
    StructField('NumAlarms', IntegerType(), True),
    StructField('UnitType', StringType(), True),
    StructField('UnitSequenceInCallDispatch', IntegerType(), True),
    StructField('FirePreventionDistrict', StringType(), True),
    StructField('SupervisorDistrict', StringType(), True),
    StructField('Neighborhood', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('RowID', StringType(), True),
    StructField('Delay', FloatType(), True)
])

# DataFrameReader 인터페이스로 CSV 파일을 읽어옵니다.
sf_fire_file = "./data/sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

# 데이터프레임의 내용 확인
fire_df.show(3)

# 데이터프레임의 스키마 확인
fire_df.printSchema()

24/10/30 11:08:08 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/10/30 11:08:09 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+------------+--------+--------------------+-----+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood|Location|               RowID|Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+-------

24/10/30 11:08:09 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 28, schema size: 26
CSV file: file:///home/seungmin/study/spark/data/sf-fire-calls.csv


In [11]:
# 데이터프레임을 파케이 형식으로 저장 가능
# Parquet는 컬럼 지향 저장 포맷으로 효율적인 데이터 처리를 지원

# parquet_path = ...
# fire_df.write.format('parquet').save(parquet_path

In [17]:
# DataFrame에서 특정 조건을 만족하는 행을 필터링할 수 있음
# select를 통해 선택한 열을 지정하고, where를 통해 필터링 조건을 설정함

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

few_fire_df = (fire_df
    .select('IncidentNumber', 'AvailableDtTm', 'CallType')
    .where(col('CallType') != 'Medical Incident')) 

few_fire_df.show(5, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [13]:
# 예시

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct

distinct_call_types_count = (fire_df
    .select("CallType")
    .where(col("CallType").isNotNull())  # Null이 아닌 값만 필터링
    .agg(countDistinct("CallType").alias("DistinctCallTypes"))  # 고유한 'CallType'의 개수 세기
)

distinct_call_types_count.show()



+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



                                                                                

In [29]:
# 예시2

(fire_df
    .select("CallType")
    .where(col("CallType").isNotNull())
    .distinct()
    .show(10, False)
)

+-----------------------------+
|CallType                     |
+-----------------------------+
|Elevator / Escalator Rescue  |
|Marine Fire                  |
|Aircraft Emergency           |
|Administrative               |
|Alarms                       |
|Odor (Strange / Unknown)     |
|Citizen Assist / Service Call|
|HazMat                       |
|Watercraft in Distress       |
|Explosion                    |
+-----------------------------+
only showing top 10 rows



In [14]:
# 열 이름을 변경할 수 있음
# 열 이름 변경 후, 바로 해당 열을 통해서 데이터를 가져올 수 있음

new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")

(new_fire_df
    .select("ResponseDelayedinMins")  # "ResponseDelayedinMins" 열 선택
    .where(col("ResponseDelayedinMins") > 5)
    .show(5, False)
)

24/10/30 11:09:13 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Location
 Schema: Delay
Expected: Delay but found: Location
CSV file: file:///home/seungmin/study/spark/data/sf-fire-calls.csv


+---------------------+
|ResponseDelayedinMins|
+---------------------+
+---------------------+

