URL: https://sparkbyexamples.com/pyspark-tutorial/

- pyspark - create dataframe with examples

# RDD에서 DataFrame 생성

In [3]:
# sample data 정의
columns = ["language","users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

In [9]:
# spark session 실행, rdd 정의
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
rdd = spark.sparkContext.parallelize(data)
print(spark)
print(rdd)

<pyspark.sql.session.SparkSession object at 0x7fafbf271b50>
ParallelCollectionRDD[26] at readRDDFromFile at PythonRDD.scala:262


## toDF() 함수 사용

In [15]:
# toDF() 함수 사용
# 기존 RDD에서 dataframe을 만드는 데 사용함
# RDD에는 열이 없는데, DataFrame은 두 개의 열이 있으므로 기본 열 이름 "_1"과 "_2"로 생성됩니다.


dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
dfFromRDD1

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



DataFrame[_1: string, _2: string]

In [7]:
#DataFrame에 컬럼의 이름을 적용하려면 아래와 같이 열 이름을 받아줘야함
dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



## SparkSession에서 createDataFrame() 사용하기

In [16]:
dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
dfFromRDD1.printSchema()
dfFromRDD2

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



DataFrame[language: string, users_count: string]

# List Collection에서 DataFrame 만들기

## SparkSession()에서 createDataFrame() 사용하기
- column명을 지정해주고 싶으면 toDF() 안에 *columns로 지정해줌

In [20]:
dfFromData2 = spark.createDataFrame(data).toDF(*columns)
dfFromData2.printSchema()

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



## Row type과 createDataFrame() 사용

In [21]:
from pyspark.sql import Row

In [22]:
rowData = map(lambda x: Row(*x), data) 
dfFromData3 = spark.createDataFrame(rowData, columns)
dfFromData3.printSchema()

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



## 스키마로 DataFrame 생성

In [8]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



# 데이터 소스에서 DataFrame 만들기
- csv, text, json, xml 등
- DataFrame을 생성하려면 DataFrameReader클래스 에서 사용 가능한 적절한 메서드를 사용해야 합니다 .

## CSV에서 DataFrame 만들기

read 예시
df2 = spark.read.csv("/src/resources/file.csv")

In [86]:
common_path = "/Users/a201806014/study/pyspark_examples/"

In [88]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df = spark.read.csv(f"{common_path}/resources/zipcodes.csv")
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)



In [89]:
df = spark.read.format("csv")\
.load(f"{common_path}/resources/zipcodes.csv")
# df = spark.read.format("org.apache.spark.sql.csv").load("/Users/a201806014/study/pyspark_examples/resources/zipcodes.csv")
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)



### 열 이름에 헤더 레코드 사용

In [92]:
df2 = spark.read.option("header",True) \ # 헤더가 이미 있으면 명시적으로 True 전달
.csv(f"{common_path}/resources/zipcodes.csv")
df2.printSchema()

root
 |-- RecordNumber: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Xaxis: string (nullable = true)
 |-- Yaxis: string (nullable = true)
 |-- Zaxis: string (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: string (nullable = true)
 |-- TaxReturnsFiled: string (nullable = true)
 |-- EstimatedPopulation: string (nullable = true)
 |-- TotalWages: string (nullable = true)
 |-- Notes: string (nullable = true)



### 여러 CSV파일 읽기

In [103]:
# read.csv()방법을 사용하면 여러 csv 파일을 읽을 수도 있습니다. 
# 예를 들어 다음과 같이 쉼표를 경로로 구분하여 모든 파일 이름을 전달할 수 있습니다. 
# df = spark.read.csv("path1,path2,path3")

df2 = spark.read.csv(f"{common_path}/resources/zipcodes.csv",
                     f"{common_path}/resources/zipcodes.csv")
df2.printSchema()

ParseException: 
extraneous input '/' expecting {'ADD', 'AFTER', 'ALL', 'ALTER', 'ANALYZE', 'AND', 'ANTI', 'ANY', 'ARCHIVE', 'ARRAY', 'AS', 'ASC', 'AT', 'AUTHORIZATION', 'BETWEEN', 'BOTH', 'BUCKET', 'BUCKETS', 'BY', 'CACHE', 'CASCADE', 'CASE', 'CAST', 'CHANGE', 'CHECK', 'CLEAR', 'CLUSTER', 'CLUSTERED', 'CODEGEN', 'COLLATE', 'COLLECTION', 'COLUMN', 'COLUMNS', 'COMMENT', 'COMMIT', 'COMPACT', 'COMPACTIONS', 'COMPUTE', 'CONCATENATE', 'CONSTRAINT', 'COST', 'CREATE', 'CROSS', 'CUBE', 'CURRENT', 'CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP', 'CURRENT_USER', 'DATA', 'DATABASE', DATABASES, 'DAY', 'DBPROPERTIES', 'DEFINED', 'DELETE', 'DELIMITED', 'DESC', 'DESCRIBE', 'DFS', 'DIRECTORIES', 'DIRECTORY', 'DISTINCT', 'DISTRIBUTE', 'DIV', 'DROP', 'ELSE', 'END', 'ESCAPE', 'ESCAPED', 'EXCEPT', 'EXCHANGE', 'EXISTS', 'EXPLAIN', 'EXPORT', 'EXTENDED', 'EXTERNAL', 'EXTRACT', 'FALSE', 'FETCH', 'FIELDS', 'FILTER', 'FILEFORMAT', 'FIRST', 'FOLLOWING', 'FOR', 'FOREIGN', 'FORMAT', 'FORMATTED', 'FROM', 'FULL', 'FUNCTION', 'FUNCTIONS', 'GLOBAL', 'GRANT', 'GROUP', 'GROUPING', 'HAVING', 'HOUR', 'IF', 'IGNORE', 'IMPORT', 'IN', 'INDEX', 'INDEXES', 'INNER', 'INPATH', 'INPUTFORMAT', 'INSERT', 'INTERSECT', 'INTERVAL', 'INTO', 'IS', 'ITEMS', 'JOIN', 'KEYS', 'LAST', 'LATERAL', 'LAZY', 'LEADING', 'LEFT', 'LIKE', 'LIMIT', 'LINES', 'LIST', 'LOAD', 'LOCAL', 'LOCATION', 'LOCK', 'LOCKS', 'LOGICAL', 'MACRO', 'MAP', 'MATCHED', 'MERGE', 'MINUTE', 'MONTH', 'MSCK', 'NAMESPACE', 'NAMESPACES', 'NATURAL', 'NO', NOT, 'NULL', 'NULLS', 'OF', 'ON', 'ONLY', 'OPTION', 'OPTIONS', 'OR', 'ORDER', 'OUT', 'OUTER', 'OUTPUTFORMAT', 'OVER', 'OVERLAPS', 'OVERLAY', 'OVERWRITE', 'PARTITION', 'PARTITIONED', 'PARTITIONS', 'PERCENT', 'PIVOT', 'PLACING', 'POSITION', 'PRECEDING', 'PRIMARY', 'PRINCIPALS', 'PROPERTIES', 'PURGE', 'QUERY', 'RANGE', 'RECORDREADER', 'RECORDWRITER', 'RECOVER', 'REDUCE', 'REFERENCES', 'REFRESH', 'RENAME', 'REPAIR', 'REPLACE', 'RESET', 'RESTRICT', 'REVOKE', 'RIGHT', RLIKE, 'ROLE', 'ROLES', 'ROLLBACK', 'ROLLUP', 'ROW', 'ROWS', 'SCHEMA', 'SECOND', 'SELECT', 'SEMI', 'SEPARATED', 'SERDE', 'SERDEPROPERTIES', 'SESSION_USER', 'SET', 'MINUS', 'SETS', 'SHOW', 'SKEWED', 'SOME', 'SORT', 'SORTED', 'START', 'STATISTICS', 'STORED', 'STRATIFY', 'STRUCT', 'SUBSTR', 'SUBSTRING', 'TABLE', 'TABLES', 'TABLESAMPLE', 'TBLPROPERTIES', TEMPORARY, 'TERMINATED', 'THEN', 'TO', 'TOUCH', 'TRAILING', 'TRANSACTION', 'TRANSACTIONS', 'TRANSFORM', 'TRIM', 'TRUE', 'TRUNCATE', 'TYPE', 'UNARCHIVE', 'UNBOUNDED', 'UNCACHE', 'UNION', 'UNIQUE', 'UNKNOWN', 'UNLOCK', 'UNSET', 'UPDATE', 'USE', 'USER', 'USING', 'VALUES', 'VIEW', 'VIEWS', 'WHEN', 'WHERE', 'WINDOW', 'WITH', 'YEAR', IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 0)

== SQL ==
/Users/a201806014/study/pyspark_examples//resources/zipcodes.csv
^^^


### 디렉토리의 모든 CSV 파일 읽기

In [113]:
df = spark.read.option("header",True).csv(f"{common_path}/resources/")
df.printSchema()

root
 |-- {"RecordNumber":1: string (nullable = true)
 |-- "Zipcode":704: string (nullable = true)
 |-- "ZipCodeType":"STANDARD": string (nullable = true)
 |-- "City":"PARC PARQUE": string (nullable = true)
 |-- "State":"PR": string (nullable = true)
 |-- "LocationType":"NOT ACCEPTABLE": string (nullable = true)
 |-- "Lat":17.96: string (nullable = true)
 |-- "Long":-66.22: string (nullable = true)
 |-- "Xaxis":0.38: string (nullable = true)
 |-- "Yaxis":-0.87: string (nullable = true)
 |-- "Zaxis":0.3: string (nullable = true)
 |-- "WorldRegion":"NA": string (nullable = true)
 |-- "Country":"US": string (nullable = true)
 |-- "LocationText":"Parc Parque: string (nullable = true)
 |--  PR": string (nullable = true)
 |-- "Location":"NA-US-PR-PARC PARQUE": string (nullable = true)
 |-- "Decommisioned":false}: string (nullable = true)



## CSV 파일을 읽을 때의 옵션

### 구분자(separator)

In [121]:
# default separator는 쉼표 ,
df3 = spark.read.options()\
.csv(f"{common_path}/resources/zipcodes.csv")
df3

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string, _c15: string, _c16: string, _c17: string, _c18: string, _c19: string]

In [122]:
df3 = spark.read.options(delimiter='|')\
.csv(f"{common_path}/resources/zipcodes.csv")
df3

DataFrame[_c0: string]

In [123]:
df3 = spark.read.options(delimiter='\t')\
.csv(f"{common_path}/resources/zipcodes.csv")
df3

DataFrame[_c0: string]

### 스키마 추론
- 데이터를 기반으로 열 유형을 자동으로 유추하는 경우입니다. 스키마를 유추하려면 데이터를 한 번 더 읽어야 합니다.

In [146]:
df4 = spark.read.options(delimiter=',')\
.csv(f"{common_path}/resources/zipcodes.csv")
df4

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string, _c15: string, _c16: string, _c17: string, _c18: string, _c19: string]

In [147]:
df4 = spark.read.options(inferSchema='True',delimiter=',')\
.csv(f"{common_path}/resources/zipcodes.csv")
df4

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string, _c15: string, _c16: string, _c17: string, _c18: string, _c19: string]

In [148]:
df4 = spark.read.option("inferSchema",True)\
.option("delimiter",",")\
.csv(f"{common_path}/resources/zipcodes.csv")
df4

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string, _c15: string, _c16: string, _c17: string, _c18: string, _c19: string]

### header(헤더)
- 이 옵션은 CSV 파일의 첫 번째 줄을 열 이름으로 읽는 데 사용됩니다. 기본적으로 이 옵션의 값은  False 이며 모든 열 유형은 문자열로 간주됩니다.

In [150]:
df3 = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv(f"{common_path}/resources/zipcodes.csv")
df3

DataFrame[RecordNumber: int, Zipcode: int, ZipCodeType: string, City: string, State: string, LocationType: string, Lat: double, Long: double, Xaxis: double, Yaxis: double, Zaxis: double, WorldRegion: string, Country: string, LocationText: string, Location: string, Decommisioned: boolean, TaxReturnsFiled: int, EstimatedPopulation: int, TotalWages: int, Notes: string]

## 사용자 지정 스키마로 CSV 파일 읽기
- 파일의 스키마를 미리 알고있다면 inferSchema를 사용하지 않고 사용자 지정 스키마를 사용해야함

In [164]:
from pyspark.sql.types import StructType,StructField,StringType, IntegerType, DoubleType, BooleanType

# from pyspark.sql.types import StructType


schema = StructType() \
      .add("RecordNumber",IntegerType(),True) \
      .add("Zipcode",IntegerType(),True) \
      .add("ZipCodeType",StringType(),True) \
      .add("City",StringType(),True) \
      .add("State",StringType(),True) \
      .add("LocationType",StringType(),True) \
      .add("Lat",DoubleType(),True) \
      .add("Long",DoubleType(),True) \
      .add("Xaxis",IntegerType(),True) \
      .add("Yaxis",DoubleType(),True) \
      .add("Zaxis",DoubleType(),True) \
      .add("WorldRegion",StringType(),True) \
      .add("Country",StringType(),True) \
      .add("LocationText",StringType(),True) \
      .add("Location",StringType(),True) \
      .add("Decommisioned",BooleanType(),True) \
      .add("TaxReturnsFiled",StringType(),True) \
      .add("EstimatedPopulation",IntegerType(),True) \
      .add("TotalWages",IntegerType(),True) \
      .add("Notes",StringType(),True)
      
df_with_schema = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load(f"/{common_path}/resources/zipcodes.csv")

Py4JJavaError: An error occurred while calling o1291.load.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "null"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
