In [1]:
'''
標準輸入(stdin)
標準輸出(stdout)
'''
# sys.stdin & sys.stdout
# egrep.py
import sys, re

# sys.argv 是一個列表，包含所有命令參數
# sys.argv[0] 是程式自己的名稱
# sys.argv[1] 命令列中的正規表達式
regex = sys.argv[1]

# 針對傳遞到 script 腳本中的每一行
for line in sys.stdin:
    # 如果符合正規表達式，就寫進 stdout
    if re.search(regex, line):
        sys.stdout.write(line)

# line_count.py
import sys

count = 0
for line in sys.stdin:
    count += 1
    
# print 把結果輸出到 stdout
print("count :", count)

count : 0


In [6]:
# | 代表管道符號，把左邊指令的輸出當成右邊指令的輸入
print(type(SomeFile.txt | python egrep.py "[0-9]" | python line_count.py))

SyntaxError: invalid syntax (<ipython-input-6-19bf71771f25>, line 2)

In [7]:
# most_common_words.py
import sys
from collections import Counter

# 第一個參數，代表的是想要列出的單詞
try:
    num_words = int(sys.argv[1])
except:
    print("usage: most_common_words.py num_words")
    sys.exit(1)    # 非零的 exit 碼代表有錯誤
    
counter = Counter(word.lower()                      # 單詞全轉成小寫
                 for line in sys.stdin              # 針對標準輸入的每一行
                 for word in line.strip().split()   # 用空格來進行切分
                 if word)                           # 遇到空單詞就跳過

for word, count in counter.most_common(num_words):
    sys.stdout.write(str(count))
    sys.stdout.write("\t")
    sys.stdout.write(word)
    sys.stdout.write("\n")

usage: most_common_words.py num_words


SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [10]:
'''
文字檔案
    用 open()
'''
# 'r'表示唯獨
file_for_reading = open('reading_file.txt', 'r')      # open() 裡面放檔案的路徑和名稱

# 'w'表示可寫入 (write) -- 若原檔存在，就會破壞原檔案內容！
file_for_writing = open('writing_file.txt', 'w')

# 'a'表示可加掛 (append) -- 會將新資料加掛到檔案末端
file_for_appending = open('appending_txt', 'a')

# 完成後關閉 (close)
file_for_writing.close()

FileNotFoundError: [Errno 2] No such file or directory: 'reading_file.txt'

In [13]:
'''
with 區塊的方式
    開啟檔案後會自動關閉
'''
with open(filename, 'r') as f:
    data = function_that_gets_data_from(f)
    # 執行到這裡已關閉檔案
    process(data)
    
    
# 讀取整個文字檔
# 利用 for迴圈
starts_with_hash = 0

with open('input.txt', 'r') as f:
    for line in f:                   # 逐行處理檔案中的每一行內容
        if re.match("^#", line):     # 用正規化檢查開頭是否為 '#'
            starts_with_hash += 1    # 若是就加 1
            
# 取出 email中帶有 '@' 的字串
def get_domain(email_address):
    '''
    用 '@' 來進行切分，並送回最後那段文字
    '''
    return email_address.lower().split("@")[-1]

with open('email_addresses.txt', 'r') as f:
    domain_counts = Counter(get_domain(line.strip())
                           for line in f
                           if '@' in line)

NameError: name 'filename' is not defined

In [20]:
'''
切分檔案內容
'''
# 在 r或 w後面加上一個 b以二進位處理
# csv.reader 以迭代方式取出每一行資料
import csv

with open('tab_delimited_stock_prices.txt', 'rb') as f:
    reader = csv.reader(f, delimiter = '\t')
    for row in reader:
        date = row[0]
        symbol = row[1]
        closing_price = float(row[2])
        process(data, symbol, closing_price)

Error: iterator should return strings, not bytes (did you open the file in text mode?)

In [24]:
import csv

with open('colon_delimited_stock_prices.txt', 'rb') as f:
    reader = csv.DictReader(f, delimiter = ':')
    for row in reader:
        date = row["date"]
        symbol = row["symbol"]
        closing_price = float(row["closing_price"])
        process(data, symbol, closing_price)

Error: iterator should return strings, not bytes (did you open the file in text mode?)

In [26]:
# 使用 csv.writer 將區分好的資料顯示
today_prices = { 'AAPL' : 90.91, 'MSFT': 41.68, 'FB' : 64.5 }

with open('comma_delimited_stock_prices.txt', 'wb') as f:
    writer = csv.writer(f, delimiter= ',')
    for stock, price in today_prices.items():
        writer.writerow([stock, price])

TypeError: a bytes-like object is required, not 'str'

In [29]:
results = [["test1", "success", "Monday"],
          ["test2", "success kind of", "Tuesday"],
          ["test3", "failure, kind of", "Wednesday"],
          ["test4", "failure, utter", "Thursday"]]

# 避免以下作法
with open('bad_csv.txt', 'wb') as f:
    for row in results:
        f.write(",".join(map(str, row)))     # 可能有太多逗號
        f.write("\n")                        # 同一行資料中，可能有新行的符號

TypeError: a bytes-like object is required, not 'str'

In [31]:
'''
從 Web 網路擷取數據資料
'''
# 使用 BeautifulSoup() 和 requests.get
from bs4 import BeautifulSoup
import requests
html = requests.get("http://www.example.com").text
soup = BeautifulSoup(html, 'html5lib')

first_paragraph = soup.find('p')    # 也可只寫 soup.p
print("first_paragraph :", first_paragraph)
print("---------------")

# 使用 text 屬性，取得 Tag 物件
first_paragraph_text = soup.p.text
first_paragraph_words = soup.p.text.split()
print("first_paragraph_text :", first_paragraph_text)
print("---------------")
print("first_paragraph_words :", first_paragraph_words)
print("---------------")

# 視為 dict 取出標籤
first_paragraph_id = soup.p['id']               # 若沒有 id，會送出 KeyError
first_paragraph_id2 = soup.p.text.get('id')     # 若沒有 id，會送回 None
print("first_paragraph_id :", first_paragraph_id)
print("---------------")
print("first_paragraph_id2 :", first_paragraph_id2)
print("---------------")

first_paragraph : <p>This domain is established to be used for illustrative examples in documents. You may use this
    domain in examples without prior coordination or asking for permission.</p>
---------------
first_paragraph_text : This domain is established to be used for illustrative examples in documents. You may use this
    domain in examples without prior coordination or asking for permission.
---------------
first_paragraph_words : ['This', 'domain', 'is', 'established', 'to', 'be', 'used', 'for', 'illustrative', 'examples', 'in', 'documents.', 'You', 'may', 'use', 'this', 'domain', 'in', 'examples', 'without', 'prior', 'coordination', 'or', 'asking', 'for', 'permission.']
---------------


KeyError: 'id'

In [37]:
# 一次取得多個標籤
all_paragraphs = soup.find_all('p')             # 也可只寫 soup('p')
paragraphs_with_ids = [p for p in soup('p') if p.get('id')]
print("all_paragraphs :", all_paragraphs)
print("---------------")
print("paragraphs_with_ids :", paragraphs_with_ids)
print("---------------")

# 找出某 class 的標籤
important_paragraphs = soup('p', {'class' : 'important'})       # 該段落沒有 class的屬性內容
important_paragraphs2 = soup('p', 'important')
important_paragraphs3 = [p for p in soup('p')
                        if 'important' in p.get('class', [])]
print("important_paragraphs :", important_paragraphs)
print("important_paragraphs2 :", important_paragraphs2)
print("important_paragraphs3 :", important_paragraphs3)
print("---------------")

# 警告 : 相同的 span 會被送回來好幾次
# 會同時出現在多個 div 之中
spans_inside_divs = [span
                    for div in soup('div')       # 針對頁面中的每個 <div>
                    for span in div('span')]     # 再找出每個 <span>
print("spans_inside_divs :", spans_inside_divs)

all_paragraphs : [<p>This domain is established to be used for illustrative examples in documents. You may use this
    domain in examples without prior coordination or asking for permission.</p>, <p><a href="http://www.iana.org/domains/example">More information...</a></p>]
---------------
paragraphs_with_ids : []
---------------
important_paragraphs : []
important_paragraphs2 : []
important_paragraphs3 : []
---------------
spans_inside_divs : []


In [39]:
from bs4 import BeautifulSoup
import requests
url = "https://www.safaribooksonline.com/search/?query=data"
soup = BeautifulSoup(requests.get(url).text, 'html5lib')

tds = soup('div', 'inline text')   # 把所有 class標示為 'inline text'的 div元素全找出來
print(len(tds))

4


In [41]:
# 篩選 html內的影片
def is_video(td):
    '''
    如果其中只包含一個標示為 pricelabel 的元素，而且
    清理過文字之後開頭為 'Video'即是影片
    '''
    pricelabels = td('span', 'pricelabel')
    return (len(pricelabels) == 1 and
           pricelabels[0].text.strip().startswith("Video"))    # strip 移除字串前後空白

print(len([td for td in tds if not is_video(td)]))        # 反覆迭代擷取

4


In [45]:
# 取出 title裡 td 開頭的標籤
title = td.find('div', 'inline text').a.text

author_name = td.find('div', 'AuthorName').text
authors = [x.strip() for x in re.sub("^By ", "", author_name).split(",")]

isbn_link = td.find("div", "thumbheader").a.get("href")

# re.match 抓出括號的正規化部分
isbn = re.match("/product/(.*)\.do", isbn_link).groups(1)

# 日期是包含在 <span class="directorydate"> 的內容中
data = td.find("span", "directorydate").text.strip()

def book_info(td):
    '''
    輸入一個 BeautifulSoup 處理過的 <td> 標籤，其內容代表一本書的相關訊息，
    從中萃取出書籍的細節資訊，然後以 dict 格式送回相關資訊
    '''
    title = td.find("div", "thumbheader").a.text
    by_author = td.find('div', 'AuthorName').text
    authors = [x.strip() for x in re.sub("By ", "", by_author).split(",")]
    isbn_link = td.find("div", "thumbheader").a.get("href")
    isbn = re.match("/product/(.*)\.do", isbn_link).groups()[0]    # 擷取第一個單詞做比對
    date = td.find("span", "directorydate").text.strip()
    
    return {
        "title" : title,
        "authors" : authors,
        "isbn" : isbn,
        "date" : date
    }

NameError: name 'td' is not defined

In [48]:
# 資料擷取
from bs4 import BeautifulSoup
import requests
from time import sleep

def scrape(num_pages=100):
    
    base_url = "https://www.safaribooksonline.com/search/?query=data&extended_publisher_data=/" + \
                "true&highlight=true&is_academic_institution_account=false&source=/" + \
                "user&include_assessments=false&include_case_studies=true&include_courses=/" + \
                "true&include_orioles=true&include_playlists=true&page="

    books = []

    NUM_PAGES = 100

    for page_num in range(1, NUM_PAGES + 1):
        print("souping page", page_num, ",", len(books), " found so far")
        url = base_url + str(page_num)
        soup = BeautifulSoup(requests.get(url).text, 'html5lib')

        for td in soup('div', 'inline text'):
            if not is_video(td):
                books.append(book_info(td))
        sleep(30)
        
    return books

In [55]:
def get_year(book):
    """book["date"] looks like 'November 2017' so we need to
    split on the space and then take the second piece"""
    return int(book["date"].split()[1])

def plot_years(plt, books):
    # 2017年
    year_counts = Counter(get_year(book) for book in books
                          if get_year(book) <= 2017)
    years = sorted(year_counts)                            # 計算 years
    book_counts = [year_counts[year] for year in years]    # 計算每一年的 books


    import matplotlib.pyplot as plt

    plt.plot(years, book_counts)
    plt.xlabel("year")
    plt.ylabel("# of data books")
    plt.title("Data is Big!")
    plt.show()

In [60]:
'''
API 的使用
'''
from dateutil.parser import parse
import json

endpoint = "https://api.github.com/users/joelgrus/repos"

repos = json.loads(requests.get(endpoint).text)
# print("repos :", repos)
dates = [parse(repo["created_at"]) for repo in repos]
print("dates :", dates)
print("---------------")

month_counts = Counter(date.month for date in dates)
print("month_counts :", month_counts)
print("---------------")

weekday_counts = Counter(date.weekday() for date in dates)
print("weekday_counts :", weekday_counts)

dates : [datetime.datetime(2017, 12, 2, 20, 13, 49, tzinfo=tzutc()), datetime.datetime(2013, 7, 5, 2, 2, 28, tzinfo=tzutc()), datetime.datetime(2017, 5, 10, 17, 22, 45, tzinfo=tzutc()), datetime.datetime(2013, 11, 15, 5, 33, 22, tzinfo=tzutc()), datetime.datetime(2012, 9, 18, 4, 20, 23, tzinfo=tzutc()), datetime.datetime(2016, 7, 19, 17, 34, 31, tzinfo=tzutc()), datetime.datetime(2015, 11, 11, 14, 15, 36, tzinfo=tzutc()), datetime.datetime(2016, 5, 31, 14, 33, 6, tzinfo=tzutc()), datetime.datetime(2015, 6, 30, 0, 33, 3, tzinfo=tzutc()), datetime.datetime(2013, 8, 21, 13, 26, 5, tzinfo=tzutc()), datetime.datetime(2013, 8, 18, 5, 3, 41, tzinfo=tzutc()), datetime.datetime(2015, 7, 30, 1, 54, 55, tzinfo=tzutc()), datetime.datetime(2014, 11, 9, 2, 31, 24, tzinfo=tzutc()), datetime.datetime(2013, 11, 10, 6, 52, 56, tzinfo=tzutc()), datetime.datetime(2015, 4, 8, 1, 1, 47, tzinfo=tzutc()), datetime.datetime(2016, 1, 8, 3, 33, 58, tzinfo=tzutc()), datetime.datetime(2016, 1, 21, 6, 46, 49, tzinf

In [61]:
# 去序列化 (deserialize)
import json
serialized = """{ "title" : "Data Science Book",
                "author" : "Joel Grus",
                "publicationYear" : 2014,
                "topics" : ["data", "science", "data science"] }"""

# 把 JSON 解析成一個 python dict
deserialized = json.loads(serialized)
if "data science" in deserialized["topics"]:
    print("deserialized :", deserialized)

deserialized : {'title': 'Data Science Book', 'author': 'Joel Grus', 'publicationYear': 2014, 'topics': ['data', 'science', 'data science']}


In [62]:
'''
不需身分認證的 API
'''
import requests, json
endpoint = "http://api.github.com/users/joelgrus/repos"

repos = json.loads(requests.get(endpoint).text)
print("repos :", repos)

repos : [{'id': 112873601, 'name': 'advent2017', 'full_name': 'joelgrus/advent2017', 'owner': {'login': 'joelgrus', 'id': 1308313, 'avatar_url': 'https://avatars1.githubusercontent.com/u/1308313?v=4', 'gravatar_id': '', 'url': 'https://api.github.com/users/joelgrus', 'html_url': 'https://github.com/joelgrus', 'followers_url': 'https://api.github.com/users/joelgrus/followers', 'following_url': 'https://api.github.com/users/joelgrus/following{/other_user}', 'gists_url': 'https://api.github.com/users/joelgrus/gists{/gist_id}', 'starred_url': 'https://api.github.com/users/joelgrus/starred{/owner}{/repo}', 'subscriptions_url': 'https://api.github.com/users/joelgrus/subscriptions', 'organizations_url': 'https://api.github.com/users/joelgrus/orgs', 'repos_url': 'https://api.github.com/users/joelgrus/repos', 'events_url': 'https://api.github.com/users/joelgrus/events{/privacy}', 'received_events_url': 'https://api.github.com/users/joelgrus/received_events', 'type': 'User', 'site_admin': False}

In [65]:
# 日期的語法解析
from dateutil.parser import parse

dates = [parse(repo["created_at"]) for repo in repos]
print("dates :", dates)
print("---------------")

month_counts = Counter(date.month for date in dates)
print("month_counts :", month_counts)
print("---------------")

weekday_counts = Counter(date.weekday() for date in dates)
print("weekday_counts :", weekday_counts)

dates : [datetime.datetime(2017, 12, 2, 20, 13, 49, tzinfo=tzutc()), datetime.datetime(2013, 7, 5, 2, 2, 28, tzinfo=tzutc()), datetime.datetime(2017, 5, 10, 17, 22, 45, tzinfo=tzutc()), datetime.datetime(2013, 11, 15, 5, 33, 22, tzinfo=tzutc()), datetime.datetime(2012, 9, 18, 4, 20, 23, tzinfo=tzutc()), datetime.datetime(2016, 7, 19, 17, 34, 31, tzinfo=tzutc()), datetime.datetime(2015, 11, 11, 14, 15, 36, tzinfo=tzutc()), datetime.datetime(2016, 5, 31, 14, 33, 6, tzinfo=tzutc()), datetime.datetime(2015, 6, 30, 0, 33, 3, tzinfo=tzutc()), datetime.datetime(2013, 8, 21, 13, 26, 5, tzinfo=tzutc()), datetime.datetime(2013, 8, 18, 5, 3, 41, tzinfo=tzutc()), datetime.datetime(2015, 7, 30, 1, 54, 55, tzinfo=tzutc()), datetime.datetime(2014, 11, 9, 2, 31, 24, tzinfo=tzutc()), datetime.datetime(2013, 11, 10, 6, 52, 56, tzinfo=tzutc()), datetime.datetime(2015, 4, 8, 1, 1, 47, tzinfo=tzutc()), datetime.datetime(2016, 1, 8, 3, 33, 58, tzinfo=tzutc()), datetime.datetime(2016, 1, 21, 6, 46, 49, tzinf

In [68]:
'''
使用 Twython
'''
from twython import Twython

# fill these in if you want to use the code
CONSUMER_KEY = ""
CONSUMER_SECRET = ""
ACCESS_TOKEN = ""
ACCESS_TOKEN_SECRET = ""

def call_twitter_search_api():

    twitter = Twython(CONSUMER_KEY, CONSUMER_SECRET)

    # search for tweets containing the phrase "data science"
    for status in twitter.search(q='"data science"')["statuses"]:
        user = status["user"]["screen_name"].encode('utf-8')
        text = status["text"].encode('utf-8')
        print(user, ":", text)
        print()
        
print("call_twitter_search_api :", call_twitter_search_api)

call_twitter_search_api : <function call_twitter_search_api at 0x00000198A2087F28>


In [93]:
'''
利用繼承 (inherit) 做 twython 擷取推文
'''
from twython import TwythonStreamer

# 把數據資料添加到一個廣域變數中，此種作法不好
# 但此範例較為簡單
tweets = []

class MyStreamer(TwythonStreamer):
    '''
    來自繼承自 TwythonStreamer的子物件類別
    用來設定如何與 stream 進行互動
    '''
    
    def on_success(self, data):
        '''
        twitter 送回資料，變成 dict 以表示推文
        '''
        # 只想要收集英文推文
        if data['lang'] == 'en':
            tweets.append(data)
            print("received tweet #", len(tweets))
            
        # 收集足夠就停下
        if len(tweets) >= 1000:
            self.disconnect()
            
    def on_error(self, status_code, data):
        print("status_code :", status_code)
        print("data :", data)
        self.disconnect()
        
print("MyStreamer :", MyStreamer)
print("tweets :", tweets)
print("---------------")

# 推文是英文就加入列表直到收集 1000次之後停止
def call_twitter_streaming_api():
    stream = MyStreamer(CONSUMER_KEY, CONSUMER_SECRET,
                        ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

    # 開始取用包含關鍵字 'data' 的公開狀態 (statuses)
    stream.statuses.filter(track='data')

    # 如果是取用「所有的」公開狀態
    # stream.statuses.sample()
    
print("call_twitter_streaming_api :", call_twitter_streaming_api())

MyStreamer : <class '__main__.MyStreamer'>
tweets : []
---------------
status_code : 401
data : b'<html>\\n<head>\\n<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>\\n<title>Error 401 Unauthorized</title>\n</head>\n<body>\n<h2>HTTP ERROR: 401</h2>\n<p>Problem accessing \'/1.1/statuses/filter.json\'. Reason:\n<pre>    Unauthorized</pre>\n</body>\n</html>\n'
call_twitter_streaming_api : None


In [96]:
# 使用 hashtag
top_hashtags = Counter(hashtag['text'].lower()
                      for tweet in tweets
                      for hashtag in tweet["entities"]["hashtags"])

print(top_hashtags.most_common(5))

[]
