In [1]:
import boto3

# 연결 Parameter
# AWS 계정의 ID와 ACCESS_KEY 입력
AWS_ACCESS_KEY_ID = "..."
AWS_SECRET_ACCESS_KEY = "..."
AWS_DEFAULT_REGION = "ap-northeast-2"

### S3 클라이언트, 리소스, 세션 생성 및 연결

In [2]:
 # s3 Client 연결 함수
def s3_client_connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION):
    try: # s3 Client 생성
        s3 = boto3.client(
            service_name = "s3",
            region_name = AWS_DEFAULT_REGION,
            aws_access_key_id = AWS_ACCESS_KEY_ID,
            aws_secret_access_key = AWS_SECRET_ACCESS_KEY
        )
    except Exception as e:
        print(e)
    else:
        print("s3 bucket connected!")
        return s3
    
# s3 Resource 연결 함수
def s3_resource_connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION):
    try: # s3 Resource 생성
        s3 = boto3.resource(
            service_name = "s3",
            region_name = AWS_DEFAULT_REGION,
            aws_access_key_id = AWS_ACCESS_KEY_ID,
            aws_secret_access_key = AWS_SECRET_ACCESS_KEY
        )
    except Exception as e:
        print(e)
    else:
        print("s3 bucket connected!")
        return s3
    
# s3 Session 연결 함수
def s3_session_connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION):
    try: # s3 Session 생성
        session = boto3.Session(
            region_name = AWS_DEFAULT_REGION,
            aws_access_key_id = AWS_ACCESS_KEY_ID,
            aws_secret_access_key = AWS_SECRET_ACCESS_KEY
        )
    except Exception as e:
        print(e)
    else:
        print("s3 bucket connected!")
        return session

### S3에 버킷 생성

In [6]:
from glob import glob
import os
import botocore

BUCKET_NAME = "delta-lake-s3"
REGION = "ap-northeast-2"

# 버킷이 존재하지 않으면 생성
try:
    client.head_bucket(Bucket=BUCKET_NAME)
except botocore.exceptions.ClientError as e:
    if e.response['Error']['Code'] == '404':
        client.create_bucket(Bucket=BUCKET_NAME, CreateBucketConfiguration={'LocationConstraint': REGION})

# 생성된 버킷 목록 출력
buckets = client.list_buckets()
for bucket in buckets['Buckets']:
    print(bucket['Name'])


delta-lake-s3


### S3 서비스에 연결하여 사용 가능한 모든 버킷 확인

In [4]:
session = s3_session_connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)

# 생성된 세션을 사용하여 s3 리소스를 생성
session_s3 = session.resource('s3')

# s3 리소스 객체에서 사용 가능한 모든 버킷을 반복적으로 가져옴
for bucket in session_s3.buckets.all():
    print(bucket.name)

s3 bucket connected!
delta-lake-s3


### AWS S3계정에 있는 모든 버킷 목록 확인

In [11]:
# client 연동 후 계정에 있는 모든 버킷 목록을 보여줌
client = s3_client_connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)
response = client.list_buckets() # bucket 목록
response

s3 bucket connected!


{'ResponseMetadata': {'RequestId': 'ZHH88PNH0X73TXVS',
  'HostId': 'ndaRaH9V4KAh7OsjWwXwlCqTp5PJp3Udcs0sT3xm5lot1gj6exJboEmCq1SfFwUVNb97KUmS4kc=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'ndaRaH9V4KAh7OsjWwXwlCqTp5PJp3Udcs0sT3xm5lot1gj6exJboEmCq1SfFwUVNb97KUmS4kc=',
   'x-amz-request-id': 'ZHH88PNH0X73TXVS',
   'date': 'Thu, 11 Apr 2024 11:32:30 GMT',
   'content-type': 'application/xml',
   'transfer-encoding': 'chunked',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'Buckets': [{'Name': 'delta-lake-s3',
   'CreationDate': datetime.datetime(2024, 4, 10, 8, 10, 47, tzinfo=tzutc())}],
 'Owner': {'ID': 'a90479c1d3ffd98295c4063a3f2a08683d70179bf20da90a905b1ac9e7895834'}}

### AWS S3 버킷에 업로드 및 버킷에서 파일 다운로드

In [7]:
# Upload File
# local_path : 업로드 하려는 파일의 Local 파일 경로
# bucket_name : Upload하고자 할 bucket 이름
# key : 버킷 안에서 저장하고자 하는 경로
def upload_file(local_path, bucket_name, key, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION):
    try:
        s3 = s3_resource_connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)
        s3.meta.client.upload_file(local_path, bucket_name, key)
    except Exception as e:
        print(e)
    else:
        print("complete Save File to S3")
        
# DownLoad File
# local_path : 다운로드할 파일 Local 저장 경로
# bucket_name : 다운로드하고자 하는 파일이 저장된 bucket 이름
# key : 다운로드 하고자 하는 파일이 저장된 버킷안의 경로
def donwload_file(local_path, bucket_name, key, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION):
    try:
        s3 = s3_resource_connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)
        bucket = s3.Bucket(bucket_name)
        objects = list(bucket.objects.filter(Prefix=key))

        if objects and objects[0].key == key:
            bucket.download_file(objects[0].key, local_path)
    except Exception as e:
        print(e)
    else:
        print("complete Save File to S3")

In [12]:
# Parameter
bucket_name = "delta-lake-s3"
key = "delta-data/brewery_data_complete_extended.csv"

# Upload Parameter
Upload_path = "./brewery_data_complete_extended.csv"

# Download Parameter
Download_path = "C:/Users/yas85/OneDrive/바탕 화면"

In [11]:
import boto3

# 이미 생성된 세션 및 리소스 객체 사용
session = s3_client_connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)
s3_resource = s3_resource_connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)

bucket = s3_resource.Bucket(bucket_name)
bucket.upload_file(Upload_path, key)

s3 bucket connected!
s3 bucket connected!


In [9]:
# Download Cell
donwload_file(Download_path, bucket_name, key, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)

### S3에 있는 파일 읽어오기

In [5]:
import pandas as pd
import io

# 해당 s3 디렉토리 확인하기
s3_client = s3_client_connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)

s3 bucket connected!


In [6]:
# 확인할 디렉토리 경로 Parameter
HO = "delta-data/"

HO_info = s3_client.list_objects(Bucket = bucket_name, Prefix = HO, Delimiter = '/')

for content in HO_info['Contents']: # contents는 객체의 리스트
    print(content['Key']) # key는 객체의 키(파일 경로)

delta-data/
delta-data/brewery_data_complete_extended.csv


In [7]:
# csv파일 읽어온 후 Pandas DataFrame으로 변환하고 출력
key = "delta-data/brewery_data_complete_extended.csv"
obj = s3_client.get_object(Bucket = bucket_name, Key = key)
test_df = pd.read_csv(io.BytesIO(obj["Body"].read()))
test_df # s3객체의 내용을 바이트 스트림으로 읽음

# 실행했을 때 메모리 사용량이 급증하는 것 확인(메모리 부족으로 실행 안됨)

: 

### SparkSession 생성 및 AWS S3와 연동

In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

AWS_ACCESS_KEY_ID = "..."
AWS_SECRET_ACCESS_KEY = "..."
AWS_DEFAULT_REGION = "ap-northeast-2"

# spark session 생성시 aws와 연동하기
def s3_connect_spark(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY_ID):
    # 설정
    conf = (
        SparkConf()
        .setAppName("MY_APP") # Spark애플리케이션의 이름을 설정
        .set("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID)
        .set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
        .set("spark.jars.packages", "io.delta:delta-core_2.12:2.1.1,org.apache.hadoop:hadoop-aws:3.3.1")# Delta Lake와 AWS SDK를 Spark에 추가
        .set("spark.databricks.delta.retentionDurationCheck.enabled", "false") # Delta Lake 보존 기간 확인 비활성화
        .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") # Deltalake로 Apache Spark 설정
        .set("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") # Deltalake를 Apache Spark 카탈로그로 설정
        .set("spark.databricks.delta.properties.defaults.columnMapping.mode","name") # 칼럼 이름에서 공백 및 특수문자 인식
        .set('spark.sql.parquet.columnarReaderBatchSize',100) # Parquet 파일을 읽을 때 사용되는 배치 크기 설정, 한 번에 100개 컬럼을 읽도록 설정
        .set("spark.executor.memory", "8g") # 각 Spark worker의 memory 크기, worker가 사용할 수 있는 메모리 양
        .set("spark.driver.memory", "2g") # Spark Driver의 크기, Driver에 할당된 메모리 양을 지정
    )

    # spark 생성
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    return spark


In [2]:
spark = s3_connect_spark(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

your 131072x1 screen size is bogus. expect trouble


24/04/11 22:05:17 WARN Utils: Your hostname, Kimbugeon resolves to a loopback address: 127.0.1.1; using 172.22.182.179 instead (on interface eth0)
24/04/11 22:05:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/bugeon/deltalakeenv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/bugeon/.ivy2/cache
The jars for the packages stored in: /home/bugeon/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0086c7f6-5a95-41ae-a902-6d303a83593f;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.1.1 in central
	found io.delta#delta-storage;2.1.1 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 181ms :: artifacts dl 6ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.901 from central in [default]
	io.delta#delta-core_2.12;2.1.1 from central in [default]
	io.delta#delta-storage;2.1.1 from central in [default]
	org.antlr#antlr4

24/04/11 22:05:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### S3에서 CSV 파일 읽어온 후 Spark DataFrame으로 변환

In [5]:
# s3에서 spark로 csv 읽기
spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
# AWS s3의 버전 4 서명 활성화

s3_data_path = "s3a://delta-lake-s3/delta-data/brewery_data_complete_extended.csv"
# s3a:// -> s3 파일 시스템을 사용한다는 의미

df = spark.read.format("csv").option("header",True).option("inferSchema", "true").csv(s3_data_path)
# header: 첫 번째 행이 헤더인지 여부 지정, inferSchema: 스키마를 자동으로 추론 지정
# 마지막 부분이 읽어들인 CSV 파일을 Spark DataFrame으로 변환하는 과정

# 해당 과정은 PySpark를 사용하여 데이터를 처리하고 분석하기 의한 필수적인 단계

24/04/11 21:55:42 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

### S3에 CSV 파일을 Delta 테이블로 저장

In [5]:
save_path = "s3a://delta-lake-s3/delta-test"

df.write.format("delta").save(save_path)

[Stage 2:>                                                        (0 + 16) / 20]

24/04/11 18:38:15 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 96.51% for 14 writers
24/04/11 18:38:15 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers
24/04/11 18:38:15 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 84.45% for 16 writers
24/04/11 18:38:46 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers


[Stage 2:==>                                                      (1 + 16) / 20]

24/04/11 18:38:47 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 84.45% for 16 writers
24/04/11 18:39:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers


[Stage 2:=====>                                                   (2 + 16) / 20]

24/04/11 18:39:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 84.45% for 16 writers
24/04/11 18:39:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers




24/04/11 18:39:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 84.45% for 16 writers
24/04/11 18:39:07 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers




24/04/11 18:39:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 84.45% for 16 writers
24/04/11 18:39:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 90.08% for 15 writers




24/04/11 18:39:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,813,485,955 bytes) of heap memory
Scaling row group sizes to 96.51% for 14 writers


                                                                                

### S3에 delta lake 형식으로 저장된 데이터를 PySpark에서 읽어들여 DataFrame으로 변환

In [4]:
from delta.tables import *
from pyspark.sql.functions import *

s3table_path = "s3a://delta-lake-s3/delta-test"

# delta Table load
DTable = DeltaTable.forPath(spark, s3table_path)
print(DTable)

# S3에 delta lake 형식으로 저장된 데이터를 PySpark에서 읽어들여 DataFrame으로 변환
test = DTable.toDF()
print(type(test))

test.show()
# PySpark에서 데이터를 분석하고 처리하기 위함

24/04/11 22:06:34 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

<delta.tables.DeltaTable object at 0x7fafe533a380>
<class 'pyspark.sql.dataframe.DataFrame'>




+--------+-------------------+----------+-------+---------------+-----------------+------------------+------------------+------------------+------------------+----------+-----+----------------+---------------+------------------+-----------------+--------------------+-------------------+------------------------+----------------------------+
|Batch_ID|          Brew_Date|Beer_Style|    SKU|       Location|Fermentation_Time|       Temperature|          pH_Level|           Gravity|   Alcohol_Content|Bitterness|Color|Ingredient_Ratio|Volume_Produced|       Total_Sales|    Quality_Score|Brewhouse_Efficiency|Loss_During_Brewing|Loss_During_Fermentation|Loss_During_Bottling_Kegging|
+--------+-------------------+----------+-------+---------------+-----------------+------------------+------------------+------------------+------------------+----------+-----+----------------+---------------+------------------+-----------------+--------------------+-------------------+------------------------+----

                                                                                

### 특정 버전을 읽어와 DataFrame 생성

In [7]:
# verison 0 -> Raw Data
df0 = spark.read.format("delta").option("versionAsOf", 0).load(s3table_path)
# versionAsOf - Delta Lake 테이블의 특정 버전 읽어오기 옵션

In [8]:
# version 1 -> optimize.compaction()
(
    delta.DeltaTable.forPath(spark, s3table_path)
    .optimize() # 테이블 최적화
    .executeCompaction() # 파일을 병합
)

df1 = spark.read.format("delta").option("versionAsOf", 1).load(s3table_path)

In [5]:
# version 2 -> Z-Ordering()
(
    delta.DeltaTable.forPath(spark, s3table_path)
    .optimize()
    .executeZOrderBy("Batch_ID") # 해당 열을 기준으로 Z-Ordering(효율적으로 정렬하는 작업)
)

# 버전2는 버전1의 이후에 발생하는 변경 사항을 반영
# df0에 버전 0 실행 후 읽어옴 -> df1에 버전 0 변경 사항을 유지하면서 버전 1 실행 후 읽어옴

                                                                                

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,totalClusterParallelism:bigint,totalScheduledTasks:bigint,autoCompactParallelismStats:struct<maxClusterActiveParallelism:bigint,minClusterActiveParallelism:bigint,maxSessionActiveParallelism:bigint,minSessionActiveParallelism:bigint>>]

## 임시 뷰 생성 후 SQL 쿼리로 데이터 조회 및 수집

### version 0 : 최적화 x

In [20]:

(
    spark.read.format("delta")
    .option("versionAsOf","0")
    .load(s3table_path) # 로드할 데이터 경로
    .createOrReplaceTempView("V0") #임시 뷰 생성 
)


In [33]:
%%time

spark.sql(
    "select * from V0 where SKU = 'Cans'"
).collect()
# V0 임시 뷰에 특정 열과 행을 선택 후 수집하는 결과 반환

                                                                                

CPU times: user 18.7 s, sys: 4.7 s, total: 23.4 s
Wall time: 1min


[Row(Batch_ID=7106740, Brew_Date=datetime.datetime(2023, 1, 28, 15, 32, 27), Beer_Style='Wheat Beer', SKU='Cans', Location='Rajajinagar', Fermentation_Time=10, Temperature=24.772118587167665, pH_Level=5.488030880155754, Gravity=1.0682252360401157, Alcohol_Content=5.183063333013349, Bitterness=40, Color=14, Ingredient_Ratio='1:0.42:0.19', Volume_Produced=2326, Total_Sales=2146.4628187465187, Quality_Score=9.281348697448694, Brewhouse_Efficiency=80.66475619167915, Loss_During_Brewing=3.702237091947312, Loss_During_Fermentation=4.985811413947611, Loss_During_Bottling_Kegging=1.8877181721501861),
 Row(Batch_ID=9478056, Brew_Date=datetime.datetime(2023, 1, 28, 15, 32, 27), Beer_Style='IPA', SKU='Cans', Location='Jayanagar', Fermentation_Time=14, Temperature=15.757073121076877, pH_Level=5.001031609152168, Gravity=1.0373075862190424, Alcohol_Content=5.139507901326498, Bitterness=54, Color=19, Ingredient_Ratio='1:0.38:0.11', Volume_Produced=1711, Total_Sales=8857.688642991043, Quality_Score=9.

### version 1 : 최적화 O

In [10]:

(
    spark.read.format("delta")
    .option("versionAsOf","1")
    .load(s3table_path) # 로드할 데이터 경로
    .createOrReplaceTempView("V1") #임시 뷰 생성 
)


In [11]:
%%time

spark.sql(
    "select * from V1 where SKU = 'Cans'"
).collect()
# V1 임시 뷰에 특정 열과 행을 선택 후 수집하는 결과 반환

                                                                                

CPU times: user 16 s, sys: 1.2 s, total: 17.2 s
Wall time: 57.8 s


[Row(Batch_ID=7106740, Brew_Date=datetime.datetime(2023, 1, 28, 15, 32, 27), Beer_Style='Wheat Beer', SKU='Cans', Location='Rajajinagar', Fermentation_Time=10, Temperature=24.772118587167665, pH_Level=5.488030880155754, Gravity=1.0682252360401157, Alcohol_Content=5.183063333013349, Bitterness=40, Color=14, Ingredient_Ratio='1:0.42:0.19', Volume_Produced=2326, Total_Sales=2146.4628187465187, Quality_Score=9.281348697448694, Brewhouse_Efficiency=80.66475619167915, Loss_During_Brewing=3.702237091947312, Loss_During_Fermentation=4.985811413947611, Loss_During_Bottling_Kegging=1.8877181721501861),
 Row(Batch_ID=9478056, Brew_Date=datetime.datetime(2023, 1, 28, 15, 32, 27), Beer_Style='IPA', SKU='Cans', Location='Jayanagar', Fermentation_Time=14, Temperature=15.757073121076877, pH_Level=5.001031609152168, Gravity=1.0373075862190424, Alcohol_Content=5.139507901326498, Bitterness=54, Color=19, Ingredient_Ratio='1:0.38:0.11', Volume_Produced=1711, Total_Sales=8857.688642991043, Quality_Score=9.

### version 2 : 최적화 O

In [6]:

(
    spark.read.format("delta")
    .option("versionAsOf","2")
    .load(s3table_path) # 로드할 데이터 경로
    .createOrReplaceTempView("V2") #임시 뷰 생성 
)


In [7]:
%%time

spark.sql(
    "select * from V2 where SKU = 'Cans'"
).collect()
# V2 임시 뷰에 특정 열과 행을 선택 후 수집하는 결과 반환

                                                                                

CPU times: user 16.5 s, sys: 931 ms, total: 17.4 s
Wall time: 55.2 s


[Row(Batch_ID=7106740, Brew_Date=datetime.datetime(2023, 1, 28, 15, 32, 27), Beer_Style='Wheat Beer', SKU='Cans', Location='Rajajinagar', Fermentation_Time=10, Temperature=24.772118587167665, pH_Level=5.488030880155754, Gravity=1.0682252360401157, Alcohol_Content=5.183063333013349, Bitterness=40, Color=14, Ingredient_Ratio='1:0.42:0.19', Volume_Produced=2326, Total_Sales=2146.4628187465187, Quality_Score=9.281348697448694, Brewhouse_Efficiency=80.66475619167915, Loss_During_Brewing=3.702237091947312, Loss_During_Fermentation=4.985811413947611, Loss_During_Bottling_Kegging=1.8877181721501861),
 Row(Batch_ID=9478056, Brew_Date=datetime.datetime(2023, 1, 28, 15, 32, 27), Beer_Style='IPA', SKU='Cans', Location='Jayanagar', Fermentation_Time=14, Temperature=15.757073121076877, pH_Level=5.001031609152168, Gravity=1.0373075862190424, Alcohol_Content=5.139507901326498, Bitterness=54, Color=19, Ingredient_Ratio='1:0.38:0.11', Volume_Produced=1711, Total_Sales=8857.688642991043, Quality_Score=9.