<img width="8%" alt="TikTok.png" src="https://raw.githubusercontent.com/jupyter-naas/awesome-notebooks/master/.github/assets/logos/TikTok.png" style="border-radius: 15%">

# TikTok - Get trending songs by keyword
<a href="https://bit.ly/3JyWIk6">Give Feedback</a> | <a href="https://github.com/jupyter-naas/awesome-notebooks/issues/new?assignees=&labels=bug&template=bug_report.md&title=TikTok+-+Get+trending+songs+by+keyword:+Error+short+description">Bug report</a>

**Tags:** #tiktok #content #trends #songs

**Author:** [Alex Nodeland](https://www.linkedin.com/in/alexnodeland/)

**Last update:** 2024-06-28 (Created: 2024-06-20)

**Description:** This notebook demonstrates how to extract the top trending songs on top trending platform, TikTok, by keyword. You need to create a free account and [rent an Actor](https://console.apify.com/actors/GdWCkxBtKWOsKjdch/input) to start playing with this template.

**References:**
- [Naas Documentation](https://site.naas.ai/)
- [Apify TikTok Scraper Documentation](https://apify.com/clockworks/tiktok-scraper)

## Input

### Import libraries

In [None]:
from datetime import datetime
import nest_asyncio
import pandas as pd
try:
    from apify_client import ApifyClient
except:
    !pip install --user apify-client
    from apify_client import ApifyClient
nest_asyncio.apply()

### Setup variables

**Mandatory**
- `APIFY_API_TOKEN`: API token required to access the [Apify TikTok Scraper API](https://console.apify.com/settings/integrations)
- `searchQueries`: Keywords for which you want to extract the top trending songs on TikTok.
- `csv_output`: CSV file path to be saved as output.

**Optional**

- `hashtags`: Hashtags you want to search for in the TikTok videos. If not provided, the default value is `None`.
- `resultsPerPage`: Number of results per page. If not provided, the default value is `100`.

#### Apify API Token & Query Details

In [None]:
# Mandatory
APIFY_API_TOKEN = 'apify_api_xxxx'
searchQueries = ['afrobeat']
csv_output = 'trending_songs.csv'

# Optional - modify if desired
hashtags = []
resultsPerPage = 5

# Prepare the actor input
RUN_INPUT = {
    "hashtags": hashtags,
    "resultsPerPage": resultsPerPage,
    "searchQueries": searchQueries
}

## Model

#### Create connection object

In [None]:
client = ApifyClient(APIFY_API_TOKEN)

### Get data from Apify TikTok Scraper API

In [None]:
def get_songs_by_keyword(run_input) -> pd.DataFrame:
    """
    Get songs by keyword

    Args:
        keyword (str): keyword to search for

    Returns:
        response (pd.DataFrame): DataFrame with songs
    """
    print(f"Searching for songs with keyword: {run_input['searchQueries']}")
    
    try:
        # Run the Actor and wait for it to finish
        run = client.actor("clockworks/tiktok-scraper").call(run_input=run_input)
        print(f"Actor run completed successfully, with id: {run['id']}")

        # Fetch Actor results from the run's dataset
        items = [item for item in client.dataset(run["defaultDatasetId"]).iterate_items()]

        # Convert the list of items to a DataFrame
        response = pd.DataFrame(items)
    except Exception as e:
        print(f"An error occurred while running the actor: {e}")
        response = pd.DataFrame()

    return response

### Extract data from the response

In [None]:
# Instantiate the ABI Content Table Data Model as a pandas dataframe
attributes = {
    "ENTITY": [],
    "SCENARIO": [],
    "SOURCE": [],
    "PUBLISHED_DATE": [],
    "DATE & TIME": [],
    "ID": [],
    "TITLE": [],
    "TEXT": [],
    "CONCEPT": [],
    "SENTIMENT": [],
    "TARGET": [],
    "OBJECTIVE": [],
    "VIEWS": [],
    "LIKES": [],
    "COMMENTS": [],
    "SHARES": [],
    "ENGAGEMENTS": [],
    "ENGAGEMENT_SCORE": [],
    "TYPE": [],
    "AUTHOR_NAME": [],
    "AUTHOR_URL": [],
    "LENGTH": [],
    "PEOPLE_MENTIONED": [],
    "ORGANIZATION_MENTIONED": [],
    "CONTENT_TITLE_SHARED": [],
    "CONTENT_URL_SHARED": [],
    "LINKEDIN_LINKS": [],
    "IMAGE_SHARED": [],
    "TAGS": [],
    "URL": [],
    "DATE_EXTRACT": [],
    "SCENARIO_ORDER": [],
    "MUSIC_NAME": [],
    "MUSIC_AUTHOR": [],
    "MUSIC_ORIGINAL": [],
    "MUSIC_ALBUM": [],
    "MUSIC_PLAY_URL": [],
    "MUSIC_COVER_MEDIUM_URL": [],
    "MUSIC_ID": [],
}
content_template = pd.DataFrame(attributes)

def extract_data(response: pd.DataFrame, content_template: pd.DataFrame) -> pd.DataFrame:
    """
    Extract data from response

    Args:
        response (pd.DataFrame): DataFrame with songs

    Returns:
        content (pd.DataFrame): DataFrame with extracted data
    """
    print(f"Extracting data from response")
    content = content_template.copy()
    try:
        content = pd.concat([content, response.apply(lambda row: pd.Series({
            "ENTITY": row['musicMeta']['musicName'],
            "SCENARIO": f"Trending {row['searchQuery']} Songs on TikTok",
            "SOURCE": "TikTok",
            "PUBLISHED_DATE": row['createTimeISO'],
            "DATE & TIME": datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z"),
            "ID": row['id'],
            "TITLE": row['musicMeta']['musicName'],
            "TEXT": row['text'],
            # "CONCEPT": "",
            # "SENTIMENT": "",
            # "TARGET": "",
            # "OBJECTIVE": "",
            "VIEWS": row['playCount'],
            "LIKES": row['diggCount'],
            "COMMENTS": row['commentCount'],
            "SHARES": row['shareCount'],
            "ENGAGEMENTS": row['diggCount'] + row['commentCount'] + row['shareCount'] + row['collectCount'],
            # "ENGAGEMENT_SCORE": "",
            "TYPE": "video",
            "AUTHOR_NAME": row['authorMeta']['nickName'],
            "AUTHOR_URL": f"https://www.tiktok.com/@{row['authorMeta']['name']}",
            "LENGTH": row['videoMeta']['duration'],
            "PEOPLE_MENTIONED": ", ".join(row['mentions']) if 'mentions' in row else '',
            # "ORGANIZATION_MENTIONED": "",
            # "CONTENT_TITLE_SHARED": "",
            # "CONTENT_URL_SHARED": "",
            # "LINKEDIN_LINKS": "",
            "IMAGE_SHARED": row['videoMeta']['coverUrl'],
            "TAGS": ", ".join([tag['name'] for tag in row['hashtags']]) if 'hashtags' in row else '',
            "URL": row['webVideoUrl'],
            "DATE_EXTRACT": datetime.now().strftime("%Y-%m-%d"),
            # "SCENARIO_ORDER": "",
            "MUSIC_NAME": row['musicMeta']['musicName'],
            "MUSIC_AUTHOR": row['musicMeta']['musicAuthor'],
            "MUSIC_ORIGINAL": row['musicMeta']['musicOriginal'],
            "MUSIC_ALBUM": row['musicMeta']['musicAlbum'],
            "MUSIC_PLAY_URL": row['musicMeta']['playUrl'],
            "MUSIC_COVER_MEDIUM_URL": row['musicMeta']['coverMediumUrl'],
            "MUSIC_ID": row['musicMeta']['musicId'],
        }), axis=1)], ignore_index=True)
        print(f"Data extracted successfully")
    except Exception as e:
        print(f"An error occurred while extracting data: {e}")
    return content

## Output

### Call the function

In [None]:
print(f"The input is: \n", RUN_INPUT)
response = get_songs_by_keyword(RUN_INPUT)

### Extract the output from the response

In [None]:
content = extract_data(response, content_template)

# Save the extracted data to a CSV file
content.to_csv(csv_output, index=False)
content