In [1]:
! mkdir -p ~/aiffel/bigdata_ecosystem
! ln -s ~/data/* ~/aiffel/bigdata_ecosystem

In [3]:
import pyspark
pyspark.__version__

'3.0.1'

## 여러코드 


### CODE1


In [4]:
from pyspark import SparkConf, SparkContext

sc = SparkContext()
sc

In [5]:
type(sc)

pyspark.context.SparkContext

In [6]:
#에러 발생을 확인하세요!
new_sc = SparkContext()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-4-09e2fe521565>:3 

In [7]:
sc.stop()

### CODE 2 With Configuration

In [8]:
sc = SparkContext(master='local', appName='PySpark Basic')
sc

In [9]:
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.app.name', 'PySpark Basic'),
 ('spark.driver.host', 'whjm0aktx8cc0bdymzgdc77hd-64bd9f8f44-mwr9m'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.driver.port', '34893'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1637719033727'),
 ('spark.ui.showConsoleProgress', 'true')]

In [10]:
sc.master

'local'

In [11]:
sc.appName

'PySpark Basic'

In [12]:
sc.stop()

### CODE 3 With Spark Conf()

In [15]:
conf = SparkConf().setAppName('PySpark Basic').setMaster('local')# 어플리케이션의 이름과 Master의 URL설정
sc = SparkContext(conf=conf)
sc

In [16]:
sc.stop()

## RDD Creation


In [17]:
from pyspark import SparkConf, SparkContext

sc = SparkContext()

rdd = sc.parallelize([1,2,3])

rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

In [18]:
type(rdd)

pyspark.rdd.RDD

In [19]:
rdd.take(3)

[1, 2, 3]

#### 외부 파일 로드

- .textFile()함수 사용

In [20]:
import os

file_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/test.txt'
with open(file_path, 'w') as f:
    for i in range(10):
        f.write(str(i)+'\n')
        
print('OK')

OK


In [21]:
rdd2 = sc.textFile(file_path)
print(rdd2)
print(type(rdd2))

/aiffel/aiffel/bigdata_ecosystem/test.txt MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:0
<class 'pyspark.rdd.RDD'>


In [22]:
x = sc.parallelize(["b", "a", "c", "d"])
y = x.map(lambda z: (z, 1))
print(x.collect()) #collect()는 actions입니다. 
print(y.collect())

['b', 'a', 'c', 'd']
[('b', 1), ('a', 1), ('c', 1), ('d', 1)]


### RDD Transformations


In [23]:
#map
x = sc.parallelize(["b", "a", "c", "d"])
y = x.map(lambda z: (z, 1))
print(x.collect()) #collect()는 actions입니다. 
print(y.collect())

['b', 'a', 'c', 'd']
[('b', 1), ('a', 1), ('c', 1), ('d', 1)]


In [24]:
nums = sc.parallelize([1, 2, 3])
squares = nums.map(lambda x: x*x)
print(squares.collect())

[1, 4, 9]


In [25]:
#filter
x = sc.parallelize([1,2,3,4,5])
y = x.filter(lambda x: x%2 == 0) 
print(x.collect())
print(y.collect())

[1, 2, 3, 4, 5]
[2, 4]


In [26]:
text = sc.parallelize(['a', 'b', 'c', 'd'])
capital = text.map(lambda x: x.upper())
A = capital.filter(lambda x: 'A' in x)
print(text.collect())
print(A.collect())

['a', 'b', 'c', 'd']
['A']


In [27]:
#flatmap
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, x*10, 30))
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 10, 30, 2, 20, 30, 3, 30, 30]


In [28]:
wordsDataset = sc.parallelize(["Spark is funny", "It is beautiful", "And also It is implemented by python"])
result = wordsDataset.flatMap(lambda x: x.split()).filter(lambda x: x != " ").map(lambda x: x.lower())
# 공백은 제거합니다.
# 단어를 공백기준으로 split 합니다. 
result.collect()

['spark',
 'is',
 'funny',
 'it',
 'is',
 'beautiful',
 'and',
 'also',
 'it',
 'is',
 'implemented',
 'by',
 'python']

#### csv파일 읽기

In [29]:
import os
csv_path = os.getenv('HOME')+'/data/train.csv'
csv_data_0 = sc.textFile(csv_path)
csv_data_0.take(5)

['survived,sex,age,n_siblings_spouses,parch,fare,class,deck,embark_town,alone',
 '0,male,22.0,1,0,7.25,Third,unknown,Southampton,n',
 '1,female,38.0,1,0,71.2833,First,C,Cherbourg,n',
 '1,female,26.0,0,0,7.925,Third,unknown,Southampton,y',
 '1,female,35.0,1,0,53.1,First,C,Southampton,n']

In [30]:
# 비어있는 라인은 제외하고, delimeter인 ,로 line을 분리해 줍니다. 
csv_data_1 = csv_data_0.filter(lambda line: len(line)>1).map(lambda line: line.split(","))   
csv_data_1.take(5)

[['survived',
  'sex',
  'age',
  'n_siblings_spouses',
  'parch',
  'fare',
  'class',
  'deck',
  'embark_town',
  'alone'],
 ['0',
  'male',
  '22.0',
  '1',
  '0',
  '7.25',
  'Third',
  'unknown',
  'Southampton',
  'n'],
 ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'],
 ['1',
  'female',
  '26.0',
  '0',
  '0',
  '7.925',
  'Third',
  'unknown',
  'Southampton',
  'y'],
 ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n']]

In [31]:
columns = csv_data_1.take(1)
columns

[['survived',
  'sex',
  'age',
  'n_siblings_spouses',
  'parch',
  'fare',
  'class',
  'deck',
  'embark_town',
  'alone']]

In [32]:
csv_data_2 = csv_data_1.filter(lambda line: line[0].isdecimal())  # 첫 번째 컬럼이 숫자인 것만 필터링
csv_data_2.take(5)

[['0',
  'male',
  '22.0',
  '1',
  '0',
  '7.25',
  'Third',
  'unknown',
  'Southampton',
  'n'],
 ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'],
 ['1',
  'female',
  '26.0',
  '0',
  '0',
  '7.925',
  'Third',
  'unknown',
  'Southampton',
  'y'],
 ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n'],
 ['0',
  'male',
  '28.0',
  '0',
  '0',
  '8.4583',
  'Third',
  'unknown',
  'Queenstown',
  'y']]

In [33]:
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark import SparkFiles

url = 'https://storage.googleapis.com/tf-datasets/titanic/train.csv'
sc.addFile(url)
sqlContext = SQLContext(sc)

df = sqlContext.read.csv(SparkFiles.get("train.csv"), header=True, inferSchema= True)
df.show(5, truncate = False)

+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
|survived|sex   |age |n_siblings_spouses|parch|fare   |class|deck   |embark_town|alone|
+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
|0       |male  |22.0|1                 |0    |7.25   |Third|unknown|Southampton|n    |
|1       |female|38.0|1                 |0    |71.2833|First|C      |Cherbourg  |n    |
|1       |female|26.0|0                 |0    |7.925  |Third|unknown|Southampton|y    |
|1       |female|35.0|1                 |0    |53.1   |First|C      |Southampton|n    |
|0       |male  |28.0|0                 |0    |8.4583 |Third|unknown|Queenstown |y    |
+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
only showing top 5 rows



In [34]:
# 위에서 얻은 데이터에서 40세 이상인 사람들의 데이터만 필터링해 봅시다. 
df2 = df[df['age']>40]
df2.show(5, truncate = False)

+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
|survived|sex   |age |n_siblings_spouses|parch|fare   |class |deck   |embark_town|alone|
+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
|0       |male  |66.0|0                 |0    |10.5   |Second|unknown|Southampton|y    |
|0       |male  |42.0|1                 |0    |52.0   |First |unknown|Southampton|n    |
|1       |female|49.0|1                 |0    |76.7292|First |D      |Cherbourg  |n    |
|0       |male  |65.0|0                 |1    |61.9792|First |B      |Cherbourg  |n    |
|0       |male  |45.0|1                 |0    |83.475 |First |C      |Southampton|n    |
+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
only showing top 5 rows



#### RDD Operation -Actions

In [35]:
#collect
nums.take(3)

[1, 2, 3]

In [36]:
nums.take(3)

[1, 2, 3]

In [37]:
#count
nums.count()

3

In [38]:
#reduce
nums.reduce(lambda x, y: x + y)

6

In [39]:
file_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/file.txt'
nums.saveAsTextFile(file_path)

!ls -l ~/aiffel/bigdata_ecosystem

Py4JJavaError: An error occurred while calling o404.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/aiffel/aiffel/bigdata_ecosystem/file.txt already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:289)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1008)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1007)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:964)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1552)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1552)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1538)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1538)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:549)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [40]:
# RDD 생성
rdd = sc.parallelize(range(1,100))

# RDD Transformation 
rdd2 = rdd.map(lambda x: 0.5*x - 10).filter(lambda x: x > 0)

# RDD Action 
rdd2.reduce(lambda x, y: x + y)

1580.0